[comparison] add info APIs
- add comparison task info & by index info API - update comparison task permission control - unify migration task progress calculation - cluster migration/comparison set start time - add split task utilities - clear index migration/comparison state before running
This commit is contained in:
parent
5825bd0f71
commit
b95b1ff703
|
@ -30,22 +30,24 @@ import (
|
||||||
|
|
||||||
func InitAPI() {
|
func InitAPI() {
|
||||||
handler := APIHandler{}
|
handler := APIHandler{}
|
||||||
api.HandleAPIMethod(api.GET, "/migration/data/_search", handler.RequirePermission(handler.searchTask("cluster_migration"), enum.PermissionTaskRead))
|
api.HandleAPIMethod(api.GET, "/migration/data/_search", handler.RequirePermission(handler.searchTask("cluster_migration"), enum.PermissionMigrationTaskRead))
|
||||||
api.HandleAPIMethod(api.POST, "/migration/data", handler.RequirePermission(handler.createDataMigrationTask, enum.PermissionTaskWrite))
|
api.HandleAPIMethod(api.POST, "/migration/data", handler.RequirePermission(handler.createDataMigrationTask, enum.PermissionMigrationTaskWrite))
|
||||||
api.HandleAPIMethod(api.DELETE, "/migration/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionTaskWrite))
|
api.HandleAPIMethod(api.DELETE, "/migration/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionMigrationTaskWrite))
|
||||||
|
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionMigrationTaskWrite))
|
||||||
|
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionMigrationTaskWrite))
|
||||||
|
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead))
|
||||||
|
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead))
|
||||||
|
api.HandleAPIMethod(api.PUT, "/migration/data/:task_id/status", handler.RequirePermission(handler.updateDataMigrationTaskStatus, enum.PermissionMigrationTaskRead))
|
||||||
|
|
||||||
|
api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionComparisonTaskRead))
|
||||||
|
api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionComparisonTaskWrite))
|
||||||
|
api.HandleAPIMethod(api.DELETE, "/comparison/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionComparisonTaskWrite))
|
||||||
|
api.HandleAPIMethod(api.GET, "/comparison/data/:task_id/info", handler.RequirePermission(handler.getDataComparisonTaskInfo, enum.PermissionComparisonTaskRead))
|
||||||
|
api.HandleAPIMethod(api.GET, "/comparison/data/:task_id/info/:index", handler.RequirePermission(handler.getDataComparisonTaskOfIndex, enum.PermissionComparisonTaskRead))
|
||||||
|
api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionComparisonTaskWrite))
|
||||||
|
api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionComparisonTaskWrite))
|
||||||
|
|
||||||
api.HandleAPIMethod(api.POST, "/migration/data/_validate", handler.RequireLogin(handler.validateDataMigration))
|
api.HandleAPIMethod(api.POST, "/migration/data/_validate", handler.RequireLogin(handler.validateDataMigration))
|
||||||
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionTaskWrite))
|
|
||||||
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionTaskWrite))
|
|
||||||
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionTaskRead))
|
|
||||||
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionTaskRead))
|
|
||||||
api.HandleAPIMethod(api.PUT, "/migration/data/:task_id/status", handler.RequirePermission(handler.updateDataMigrationTaskStatus, enum.PermissionTaskRead))
|
|
||||||
|
|
||||||
api.HandleAPIMethod(api.GET, "/comparison/data/_search", handler.RequirePermission(handler.searchTask("cluster_comparison"), enum.PermissionTaskRead))
|
|
||||||
api.HandleAPIMethod(api.POST, "/comparison/data", handler.RequirePermission(handler.createDataComparisonTask, enum.PermissionTaskWrite))
|
|
||||||
api.HandleAPIMethod(api.DELETE, "/comparison/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionTaskWrite))
|
|
||||||
api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionTaskWrite))
|
|
||||||
api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionTaskWrite))
|
|
||||||
|
|
||||||
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_partition", handler.getIndexPartitionInfo)
|
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_partition", handler.getIndexPartitionInfo)
|
||||||
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_count", handler.countDocuments)
|
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_count", handler.countDocuments)
|
||||||
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_refresh", handler.refreshIndex)
|
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_refresh", handler.refreshIndex)
|
||||||
|
@ -236,15 +238,9 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
indexState, err := getMajorTaskInfoByIndex(id)
|
_, indexState, err := h.getMigrationMajorTaskInfo(id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Errorf("failed to get major task info, err: %v", err)
|
||||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
realtimeIndexState, err := getMajorTaskByIndexFromES(id)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -252,15 +248,15 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R
|
||||||
var completedIndices int
|
var completedIndices int
|
||||||
for i, index := range taskConfig.Indices {
|
for i, index := range taskConfig.Indices {
|
||||||
indexName := index.Source.GetUniqueIndexName()
|
indexName := index.Source.GetUniqueIndexName()
|
||||||
count := indexState[indexName].IndexDocs + realtimeIndexState[indexName].IndexDocs
|
count := indexState[indexName].IndexDocs
|
||||||
percent := count * 100 / float64(index.Source.Docs)
|
percent := float64(count) / float64(index.Source.Docs) * 100
|
||||||
if percent > 100 {
|
if percent > 100 {
|
||||||
percent = 100
|
percent = 100
|
||||||
}
|
}
|
||||||
taskConfig.Indices[i].Target.Docs = int64(count)
|
taskConfig.Indices[i].Target.Docs = count
|
||||||
taskConfig.Indices[i].Percent = util.ToFixed(percent, 2)
|
taskConfig.Indices[i].Percent = util.ToFixed(percent, 2)
|
||||||
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
|
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
|
||||||
if int64(count) == index.Source.Docs {
|
if count == index.Source.Docs {
|
||||||
completedIndices++
|
completedIndices++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -279,100 +275,6 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R
|
||||||
h.WriteJSON(w, obj, http.StatusOK)
|
h.WriteJSON(w, obj, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getMajorTaskInfoByIndex(taskID string) (map[string]migration_model.IndexStateInfo, error) {
|
|
||||||
query := util.MapStr{
|
|
||||||
"size": 0,
|
|
||||||
"aggs": util.MapStr{
|
|
||||||
"group_by_task": util.MapStr{
|
|
||||||
"terms": util.MapStr{
|
|
||||||
"field": "metadata.labels.unique_index_name",
|
|
||||||
"size": 100,
|
|
||||||
},
|
|
||||||
"aggs": util.MapStr{
|
|
||||||
"group_by_status": util.MapStr{
|
|
||||||
"terms": util.MapStr{
|
|
||||||
"field": "status",
|
|
||||||
"size": 100,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"total_docs": util.MapStr{
|
|
||||||
"sum": util.MapStr{
|
|
||||||
"field": "metadata.labels.index_docs",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"query": util.MapStr{
|
|
||||||
"bool": util.MapStr{
|
|
||||||
"must": []util.MapStr{
|
|
||||||
{
|
|
||||||
"term": util.MapStr{
|
|
||||||
"metadata.type": util.MapStr{
|
|
||||||
"value": "index_migration",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"term": util.MapStr{
|
|
||||||
"parent_id": util.MapStr{
|
|
||||||
"value": taskID,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
q := &orm.Query{
|
|
||||||
RawQuery: util.MustToJSONBytes(query),
|
|
||||||
}
|
|
||||||
err, result := orm.Search(task2.Task{}, q)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
searchRes := &elastic.SearchResponse{}
|
|
||||||
err = util.FromJSONBytes(result.Raw, searchRes)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
resBody := map[string]migration_model.IndexStateInfo{}
|
|
||||||
|
|
||||||
if taskAgg, ok := searchRes.Aggregations["group_by_task"]; ok {
|
|
||||||
for _, bk := range taskAgg.Buckets {
|
|
||||||
if key, ok := bk["key"].(string); ok {
|
|
||||||
//resBody[key] = int(bk["doc_count"].(float64))
|
|
||||||
resBody[key] = migration_model.IndexStateInfo{}
|
|
||||||
if statusAgg, ok := bk["group_by_status"].(map[string]interface{}); ok {
|
|
||||||
if sbks, ok := statusAgg["buckets"].([]interface{}); ok {
|
|
||||||
for _, sbk := range sbks {
|
|
||||||
if sbkM, ok := sbk.(map[string]interface{}); ok {
|
|
||||||
if sbkM["key"] == task2.StatusError {
|
|
||||||
if v, ok := sbkM["doc_count"].(float64); ok {
|
|
||||||
st := resBody[key]
|
|
||||||
st.ErrorPartitions = int(v)
|
|
||||||
resBody[key] = st
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if indexDocsAgg, ok := bk["total_docs"].(map[string]interface{}); ok {
|
|
||||||
if v, ok := indexDocsAgg["value"].(float64); ok {
|
|
||||||
st := resBody[key]
|
|
||||||
st.IndexDocs = v
|
|
||||||
resBody[key] = st
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return resBody, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getIndexTaskDocCount(ctx context.Context, index *migration_model.ClusterMigrationIndexConfig, targetESClient elastic.API) (int64, error) {
|
func getIndexTaskDocCount(ctx context.Context, index *migration_model.ClusterMigrationIndexConfig, targetESClient elastic.API) (int64, error) {
|
||||||
targetIndexName := index.Target.Name
|
targetIndexName := index.Target.Name
|
||||||
if targetIndexName == "" {
|
if targetIndexName == "" {
|
||||||
|
@ -414,6 +316,17 @@ func getIndexTaskDocCount(ctx context.Context, index *migration_model.ClusterMig
|
||||||
return countRes.Count, nil
|
return countRes.Count, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type TaskInfoResponse struct {
|
||||||
|
TaskID string `json:"task_id"`
|
||||||
|
Step interface{} `json:"step"`
|
||||||
|
StartTime int64 `json:"start_time"`
|
||||||
|
CompletedTime int64 `json:"completed_time"`
|
||||||
|
Duration int64 `json:"duration"`
|
||||||
|
DataPartition int `json:"data_partition"`
|
||||||
|
CompletedPartitions int `json:"completed_partitions"`
|
||||||
|
Partitions []util.MapStr `json:"partitions"`
|
||||||
|
}
|
||||||
|
|
||||||
func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
id := ps.MustGetParameter("task_id")
|
id := ps.MustGetParameter("task_id")
|
||||||
uniqueIndexName := ps.MustGetParameter("index")
|
uniqueIndexName := ps.MustGetParameter("index")
|
||||||
|
@ -425,122 +338,37 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var completedTime int64
|
taskInfo := &TaskInfoResponse{
|
||||||
|
TaskID: id,
|
||||||
|
StartTime: majorTask.StartTimeInMillis,
|
||||||
|
}
|
||||||
|
|
||||||
taskInfo := util.MapStr{
|
subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(&majorTask, uniqueIndexName)
|
||||||
"task_id": id,
|
|
||||||
"start_time": majorTask.StartTimeInMillis,
|
taskInfo.DataPartition = len(subTasks)
|
||||||
}
|
if len(subTasks) == 0 {
|
||||||
partitionTaskQuery := util.MapStr{
|
h.WriteJSON(w, taskInfo, http.StatusOK)
|
||||||
"size": 500,
|
|
||||||
"sort": []util.MapStr{
|
|
||||||
{
|
|
||||||
"created": util.MapStr{
|
|
||||||
"order": "asc",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
"query": util.MapStr{
|
|
||||||
"bool": util.MapStr{
|
|
||||||
"must": []util.MapStr{
|
|
||||||
{
|
|
||||||
"term": util.MapStr{
|
|
||||||
"parent_id": util.MapStr{
|
|
||||||
"value": id,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"term": util.MapStr{
|
|
||||||
"metadata.labels.unique_index_name": util.MapStr{
|
|
||||||
"value": uniqueIndexName,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
q := &orm.Query{
|
|
||||||
RawQuery: util.MustToJSONBytes(partitionTaskQuery),
|
|
||||||
}
|
|
||||||
err, result := orm.Search(task2.Task{}, q)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var subTasks []task2.Task
|
|
||||||
var pipelineTaskIDs = map[string][]string{}
|
|
||||||
pipelineSubParentIDs := map[string]string{}
|
|
||||||
subTaskStatus := map[string]string{}
|
|
||||||
parentIDPipelineTasks := map[string][]task2.Task{}
|
|
||||||
for _, row := range result.Result {
|
|
||||||
buf := util.MustToJSONBytes(row)
|
|
||||||
subTask := task2.Task{}
|
|
||||||
err = util.FromJSONBytes(buf, &subTask)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if subTask.Metadata.Labels != nil {
|
|
||||||
if subTask.Metadata.Type == "index_migration" {
|
|
||||||
subTasks = append(subTasks, subTask)
|
|
||||||
subTaskStatus[subTask.ID] = subTask.Status
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if subTask.Metadata.Labels["pipeline_id"] == "es_scroll" || subTask.Metadata.Labels["pipeline_id"] == "bulk_indexing" {
|
|
||||||
if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok {
|
|
||||||
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
|
|
||||||
if pl := len(subTask.ParentId); pl > 0 {
|
|
||||||
parentID := subTask.ParentId[pl-1]
|
|
||||||
if subTaskStatus[parentID] == task2.StatusRunning {
|
|
||||||
pipelineSubParentIDs[subTask.ID] = parentID
|
|
||||||
}
|
|
||||||
parentIDPipelineTasks[parentID] = append(parentIDPipelineTasks[parentID], subTask)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
taskInfo["data_partition"] = len(subTasks)
|
|
||||||
var scrollStats = map[string]int64{}
|
var scrollStats = map[string]int64{}
|
||||||
var bulkStats = map[string]int64{}
|
var bulkStats = map[string]int64{}
|
||||||
for instID, taskIDs := range pipelineTaskIDs {
|
pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs)
|
||||||
inst := &model.Instance{}
|
for pipelineID, pipelineContext := range pipelineContexts {
|
||||||
inst.ID = instID
|
|
||||||
_, err = orm.Get(inst)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
pipelines, err := inst.GetPipelinesByIDs(taskIDs)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for pipelineID, status := range pipelines {
|
|
||||||
if pid, ok := pipelineSubParentIDs[pipelineID]; ok {
|
if pid, ok := pipelineSubParentIDs[pipelineID]; ok {
|
||||||
if vv := migration_util.GetMapIntValue(status.Context, "es_scroll.scrolled_docs"); vv > 0 {
|
if vv := migration_util.GetMapIntValue(pipelineContext, "es_scroll.scrolled_docs"); vv > 0 {
|
||||||
scrollStats[pid] = vv
|
scrollStats[pid] = vv
|
||||||
}
|
}
|
||||||
if vv := migration_util.GetMapIntValue(status.Context, "bulk_indexing.success.count"); vv > 0 {
|
if vv := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count"); vv > 0 {
|
||||||
bulkStats[pid] = vv
|
bulkStats[pid] = vv
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks)
|
||||||
partitionTaskInfos []util.MapStr
|
|
||||||
completedPartitions int
|
var partitionTaskInfos []util.MapStr
|
||||||
startTime int64
|
|
||||||
)
|
|
||||||
if len(subTasks) > 0 {
|
|
||||||
startTime = subTasks[0].StartTimeInMillis
|
|
||||||
}
|
|
||||||
for i, ptask := range subTasks {
|
for i, ptask := range subTasks {
|
||||||
cfg := migration_model.IndexMigrationTaskConfig{}
|
cfg := migration_model.IndexMigrationTaskConfig{}
|
||||||
err := migration_util.GetTaskConfig(&ptask, &cfg)
|
err := migration_util.GetTaskConfig(&ptask, &cfg)
|
||||||
|
@ -548,20 +376,20 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
|
||||||
log.Errorf("failed to get task config, err: %v", err)
|
log.Errorf("failed to get task config, err: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
start := cfg.Source.Start
|
|
||||||
end := cfg.Source.End
|
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
step := cfg.Source.Step
|
taskInfo.Step = cfg.Source.Step
|
||||||
taskInfo["step"] = step
|
|
||||||
}
|
}
|
||||||
var durationInMS int64 = 0
|
|
||||||
|
var durationInMS int64
|
||||||
|
var subCompletedTime int64
|
||||||
if ptask.StartTimeInMillis > 0 {
|
if ptask.StartTimeInMillis > 0 {
|
||||||
if ptask.StartTimeInMillis < startTime {
|
if migration_util.IsPendingState(ptask.Status) {
|
||||||
startTime = ptask.StartTimeInMillis
|
|
||||||
}
|
|
||||||
durationInMS = time.Now().UnixMilli() - ptask.StartTimeInMillis
|
durationInMS = time.Now().UnixMilli() - ptask.StartTimeInMillis
|
||||||
if ptask.CompletedTime != nil && (ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError) {
|
continue
|
||||||
durationInMS = ptask.CompletedTime.UnixMilli() - ptask.StartTimeInMillis
|
}
|
||||||
|
if ptask.CompletedTime != nil {
|
||||||
|
subCompletedTime = ptask.CompletedTime.UnixMilli()
|
||||||
|
durationInMS = subCompletedTime - ptask.StartTimeInMillis
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
var (
|
var (
|
||||||
|
@ -580,54 +408,42 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
|
||||||
indexDocs = migration_util.GetMapIntValue(ptaskLabels, "index_docs")
|
indexDocs = migration_util.GetMapIntValue(ptaskLabels, "index_docs")
|
||||||
}
|
}
|
||||||
|
|
||||||
var subCompletedTime int64
|
|
||||||
if ptask.CompletedTime != nil {
|
|
||||||
subCompletedTime = ptask.CompletedTime.UnixMilli()
|
|
||||||
if subCompletedTime > completedTime {
|
|
||||||
completedTime = subCompletedTime
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
partitionTotalDocs := cfg.Source.DocCount
|
partitionTotalDocs := cfg.Source.DocCount
|
||||||
partitionTaskInfo := util.MapStr{
|
partitionTaskInfo := util.MapStr{
|
||||||
"task_id": ptask.ID,
|
"task_id": ptask.ID,
|
||||||
"status": ptask.Status,
|
"status": ptask.Status,
|
||||||
"start_time": ptask.StartTimeInMillis,
|
"start_time": ptask.StartTimeInMillis,
|
||||||
"completed_time": subCompletedTime,
|
"completed_time": subCompletedTime,
|
||||||
"start": start,
|
"start": cfg.Source.Start,
|
||||||
"end": end,
|
"end": cfg.Source.End,
|
||||||
"duration": durationInMS,
|
"duration": durationInMS,
|
||||||
"scroll_docs": scrollDocs,
|
"scroll_docs": scrollDocs,
|
||||||
"index_docs": indexDocs,
|
"index_docs": indexDocs,
|
||||||
"total_docs": partitionTotalDocs,
|
"total_docs": partitionTotalDocs,
|
||||||
}
|
}
|
||||||
for _, pipelineTask := range parentIDPipelineTasks[ptask.ID] {
|
scrollTask, bulkTask := migration_util.SplitIndexMigrationTasks(parentIDPipelineTasks[ptask.ID])
|
||||||
if pipelineTask.Metadata.Labels["pipeline_id"] == "es_scroll" {
|
if scrollTask != nil {
|
||||||
partitionTaskInfo["scroll_task"] = util.MapStr{
|
partitionTaskInfo["scroll_task"] = util.MapStr{
|
||||||
"id": pipelineTask.ID,
|
"id": scrollTask.ID,
|
||||||
"status": pipelineTask.Status,
|
"status": scrollTask.Status,
|
||||||
}
|
}
|
||||||
} else if pipelineTask.Metadata.Labels["pipeline_id"] == "bulk_indexing" {
|
}
|
||||||
|
if bulkTask != nil {
|
||||||
partitionTaskInfo["bulk_task"] = util.MapStr{
|
partitionTaskInfo["bulk_task"] = util.MapStr{
|
||||||
"id": pipelineTask.ID,
|
"id": bulkTask.ID,
|
||||||
"status": pipelineTask.Status,
|
"status": bulkTask.Status,
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
partitionTaskInfos = append(partitionTaskInfos, partitionTaskInfo)
|
partitionTaskInfos = append(partitionTaskInfos, partitionTaskInfo)
|
||||||
if ptask.Status == task2.StatusComplete || ptask.Status == task2.StatusError {
|
|
||||||
completedPartitions++
|
|
||||||
}
|
}
|
||||||
|
taskInfo.CompletedTime = completedTime
|
||||||
|
taskInfo.Duration = duration
|
||||||
|
// NOTE: overwrite major task start time with the first started sub task
|
||||||
|
if taskInfo.StartTime == 0 {
|
||||||
|
taskInfo.StartTime = startTime
|
||||||
}
|
}
|
||||||
if len(subTasks) == completedPartitions {
|
taskInfo.Partitions = partitionTaskInfos
|
||||||
taskInfo["completed_time"] = completedTime
|
taskInfo.CompletedPartitions = completedPartitions
|
||||||
taskInfo["duration"] = completedTime - startTime
|
|
||||||
} else {
|
|
||||||
taskInfo["duration"] = time.Now().UnixMilli() - startTime
|
|
||||||
}
|
|
||||||
taskInfo["start_time"] = startTime
|
|
||||||
taskInfo["partitions"] = partitionTaskInfos
|
|
||||||
taskInfo["completed_partitions"] = completedPartitions
|
|
||||||
h.WriteJSON(w, taskInfo, http.StatusOK)
|
h.WriteJSON(w, taskInfo, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -828,48 +644,30 @@ func (h *APIHandler) refreshIndex(w http.ResponseWriter, req *http.Request, ps h
|
||||||
}, 200)
|
}, 200)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats migration_model.MajorTaskState, err error) {
|
func (h *APIHandler) getChildTaskInfosByIndex(taskItem *task2.Task, uniqueIndexName string) (subTasks []task2.Task, pipelineTaskIDs map[string][]string, pipelineSubParentIDs map[string]string, parentIDPipelineTasks map[string][]task2.Task, err error) {
|
||||||
taskQuery := util.MapStr{
|
queryDsl := util.MapStr{
|
||||||
"size": 500,
|
"size": 9999,
|
||||||
|
"sort": []util.MapStr{
|
||||||
|
{
|
||||||
|
"created": util.MapStr{
|
||||||
|
"order": "asc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
"query": util.MapStr{
|
"query": util.MapStr{
|
||||||
"bool": util.MapStr{
|
"bool": util.MapStr{
|
||||||
"must": []util.MapStr{
|
"must": []util.MapStr{
|
||||||
{
|
{
|
||||||
"term": util.MapStr{
|
"term": util.MapStr{
|
||||||
"parent_id": util.MapStr{
|
"parent_id": util.MapStr{
|
||||||
"value": majorTaskID,
|
"value": taskItem.ID,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"bool": util.MapStr{
|
|
||||||
"minimum_should_match": 1,
|
|
||||||
"should": []util.MapStr{
|
|
||||||
{
|
{
|
||||||
"term": util.MapStr{
|
"term": util.MapStr{
|
||||||
"metadata.labels.pipeline_id": util.MapStr{
|
"metadata.labels.unique_index_name": util.MapStr{
|
||||||
"value": "bulk_indexing",
|
"value": uniqueIndexName,
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"bool": util.MapStr{
|
|
||||||
"must": []util.MapStr{
|
|
||||||
{
|
|
||||||
"term": util.MapStr{
|
|
||||||
"metadata.type": util.MapStr{
|
|
||||||
"value": "index_migration",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"terms": util.MapStr{
|
|
||||||
"status": []string{task2.StatusComplete, task2.StatusError},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -878,13 +676,17 @@ func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats migration_mod
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
q := &orm.Query{
|
q := &orm.Query{
|
||||||
RawQuery: util.MustToJSONBytes(taskQuery),
|
RawQuery: util.MustToJSONBytes(queryDsl),
|
||||||
}
|
}
|
||||||
err, result := orm.Search(task2.Task{}, q)
|
err, result := orm.Search(task2.Task{}, q)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return taskStats, err
|
return
|
||||||
}
|
}
|
||||||
var pipelineTaskIDs = map[string][]string{}
|
|
||||||
|
pipelineTaskIDs = map[string][]string{}
|
||||||
|
pipelineSubParentIDs = map[string]string{}
|
||||||
|
parentIDPipelineTasks = map[string][]task2.Task{}
|
||||||
|
|
||||||
for _, row := range result.Result {
|
for _, row := range result.Result {
|
||||||
buf := util.MustToJSONBytes(row)
|
buf := util.MustToJSONBytes(row)
|
||||||
subTask := task2.Task{}
|
subTask := task2.Task{}
|
||||||
|
@ -893,114 +695,90 @@ func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats migration_mod
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
taskLabels := util.MapStr(subTask.Metadata.Labels)
|
|
||||||
if subTask.Metadata.Labels != nil {
|
if subTask.Metadata.Type != "pipeline" {
|
||||||
//add indexDocs of already complete
|
subTasks = append(subTasks, subTask)
|
||||||
if subTask.Metadata.Type == "index_migration" {
|
continue
|
||||||
taskStats.IndexDocs += migration_util.GetMapIntValue(taskLabels, "index_docs")
|
}
|
||||||
|
if subTask.Status != task2.StatusRunning {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: use more robust logic
|
||||||
|
if pl := len(subTask.ParentId); pl != 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
parentID := subTask.ParentId[1]
|
||||||
|
|
||||||
|
pipelineSubParentIDs[subTask.ID] = parentID
|
||||||
|
instID := migration_util.GetMapStringValue(util.MapStr(subTask.Metadata.Labels), "execution_instance_id")
|
||||||
|
if instID == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" {
|
|
||||||
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
|
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
|
||||||
|
parentIDPipelineTasks[parentID] = append(parentIDPipelineTasks[parentID], subTask)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
func (h *APIHandler) getChildPipelineInfosFromGateway(pipelineTaskIDs map[string][]string) (pipelineContexts map[string]util.MapStr) {
|
||||||
|
pipelineContexts = map[string]util.MapStr{}
|
||||||
|
var err error
|
||||||
|
|
||||||
for instID, taskIDs := range pipelineTaskIDs {
|
for instID, taskIDs := range pipelineTaskIDs {
|
||||||
inst := &model.Instance{}
|
inst := &model.Instance{}
|
||||||
inst.ID = instID
|
inst.ID = instID
|
||||||
_, err = orm.Get(inst)
|
_, err = orm.Get(inst)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error("failed to get instance info, err: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
pipelines, err := inst.GetPipelinesByIDs(taskIDs)
|
pipelines, err := inst.GetPipelinesByIDs(taskIDs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Errorf("failed to get pipelines info, err: %v", err)
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, status := range pipelines {
|
|
||||||
taskStats.IndexDocs += migration_util.GetMapIntValue(status.Context, "bulk_indexing.success.count")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return taskStats, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getMajorTaskByIndexFromES(majorTaskID string) (map[string]migration_model.IndexStateInfo, error) {
|
|
||||||
taskQuery := util.MapStr{
|
|
||||||
"size": 500,
|
|
||||||
"query": util.MapStr{
|
|
||||||
"bool": util.MapStr{
|
|
||||||
"must": []util.MapStr{
|
|
||||||
{
|
|
||||||
"term": util.MapStr{
|
|
||||||
"parent_id": util.MapStr{
|
|
||||||
"value": majorTaskID,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"term": util.MapStr{
|
|
||||||
"metadata.labels.pipeline_id": util.MapStr{
|
|
||||||
"value": "bulk_indexing",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
q := &orm.Query{
|
|
||||||
RawQuery: util.MustToJSONBytes(taskQuery),
|
|
||||||
}
|
|
||||||
err, result := orm.Search(task2.Task{}, q)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
var pipelineTaskIDs = map[string][]string{}
|
|
||||||
var pipelineIndexNames = map[string]string{}
|
|
||||||
for _, row := range result.Result {
|
|
||||||
buf := util.MustToJSONBytes(row)
|
|
||||||
subTask := task2.Task{}
|
|
||||||
err = util.FromJSONBytes(buf, &subTask)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if subTask.Metadata.Labels != nil {
|
|
||||||
if instID, ok := subTask.Metadata.Labels["execution_instance_id"].(string); ok {
|
|
||||||
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
|
|
||||||
}
|
|
||||||
if indexName, ok := subTask.Metadata.Labels["unique_index_name"].(string); ok {
|
|
||||||
pipelineIndexNames[subTask.ID] = indexName
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
state := map[string]migration_model.IndexStateInfo{}
|
|
||||||
for instID, taskIDs := range pipelineTaskIDs {
|
|
||||||
inst := &model.Instance{}
|
|
||||||
inst.ID = instID
|
|
||||||
_, err = orm.Get(inst)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
pipelines, err := inst.GetPipelinesByIDs(taskIDs)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
for pipelineID, status := range pipelines {
|
for pipelineID, status := range pipelines {
|
||||||
indexName := pipelineIndexNames[pipelineID]
|
pipelineContexts[pipelineID] = status.Context
|
||||||
if v, err := status.Context.GetValue("bulk_indexing.success.count"); err == nil {
|
|
||||||
if vv, ok := v.(float64); ok && indexName != "" {
|
|
||||||
st := state[indexName]
|
|
||||||
st.IndexDocs += vv
|
|
||||||
state[indexName] = st
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *APIHandler) calcMajorTaskInfo(subTasks []task2.Task) (startTime int64, completedTime int64, duration int64, completedPartitions int) {
|
||||||
|
if len(subTasks) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, subTask := range subTasks {
|
||||||
|
if subTask.StartTimeInMillis > 0 {
|
||||||
|
if startTime == 0 {
|
||||||
|
startTime = subTask.StartTimeInMillis
|
||||||
|
}
|
||||||
|
if subTask.StartTimeInMillis < startTime {
|
||||||
|
startTime = subTask.StartTimeInMillis
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return state, nil
|
if subTask.CompletedTime != nil {
|
||||||
|
subCompletedTime := subTask.CompletedTime.UnixMilli()
|
||||||
|
if subCompletedTime > completedTime {
|
||||||
|
completedTime = subCompletedTime
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if subTask.Status == task2.StatusComplete || subTask.Status == task2.StatusError {
|
||||||
|
completedPartitions++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(subTasks) != completedPartitions {
|
||||||
|
completedTime = 0
|
||||||
|
duration = time.Now().UnixMilli() - startTime
|
||||||
|
} else {
|
||||||
|
duration = completedTime - startTime
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,6 +55,9 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
taskItem.RetryTimes++
|
taskItem.RetryTimes++
|
||||||
|
if taskItem.StartTimeInMillis == 0 {
|
||||||
|
taskItem.StartTimeInMillis = time.Now().UnixMilli()
|
||||||
|
}
|
||||||
taskItem.Status = task.StatusRunning
|
taskItem.Status = task.StatusRunning
|
||||||
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||||
Success: true,
|
Success: true,
|
||||||
|
@ -102,6 +105,9 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: parition twice for source & target, then merge
|
||||||
|
// if there's a partition missing from source but present in target
|
||||||
|
// ideally we can capture it in docs count, but this won't always work
|
||||||
if index.Partition != nil {
|
if index.Partition != nil {
|
||||||
partitionQ := &elastic.PartitionQuery{
|
partitionQ := &elastic.PartitionQuery{
|
||||||
IndexName: index.Source.Name,
|
IndexName: index.Source.Name,
|
||||||
|
@ -119,18 +125,20 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error {
|
||||||
if partitions == nil || len(partitions) == 0 {
|
if partitions == nil || len(partitions) == 0 {
|
||||||
return fmt.Errorf("empty data with filter: %s", util.MustToJSON(index.RawFilter))
|
return fmt.Errorf("empty data with filter: %s", util.MustToJSON(index.RawFilter))
|
||||||
}
|
}
|
||||||
|
var (
|
||||||
|
partitionID int
|
||||||
|
)
|
||||||
for _, partition := range partitions {
|
for _, partition := range partitions {
|
||||||
//skip empty partition
|
partitionID++
|
||||||
if partition.Docs <= 0 {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
partitionSourceDump := sourceDump
|
partitionSourceDump := sourceDump
|
||||||
partitionSourceDump.QueryDSL = partition.Filter
|
partitionSourceDump.Start = partition.Start
|
||||||
|
partitionSourceDump.End = partition.End
|
||||||
partitionSourceDump.DocCount = partition.Docs
|
partitionSourceDump.DocCount = partition.Docs
|
||||||
|
partitionSourceDump.Step = index.Partition.Step
|
||||||
|
partitionSourceDump.PartitionId = partitionID
|
||||||
|
partitionSourceDump.QueryDSL = partition.Filter
|
||||||
partitionSourceDump.QueryString = ""
|
partitionSourceDump.QueryString = ""
|
||||||
|
|
||||||
// TODO: if there's a partition missing from source but present in target
|
|
||||||
// ideally we can capture it in docs count, but this won't always work
|
|
||||||
partitionTargetDump := partitionSourceDump
|
partitionTargetDump := partitionSourceDump
|
||||||
partitionTargetDump.Indices = index.Target.Name
|
partitionTargetDump.Indices = index.Target.Name
|
||||||
|
|
||||||
|
@ -166,6 +174,7 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error {
|
||||||
sourceDump.DocCount = index.Source.Docs
|
sourceDump.DocCount = index.Source.Docs
|
||||||
targetDump := sourceDump
|
targetDump := sourceDump
|
||||||
targetDump.Indices = index.Target.Name
|
targetDump.Indices = index.Target.Name
|
||||||
|
targetDump.DocCount = index.Target.Docs
|
||||||
|
|
||||||
indexComparisonTask := task.Task{
|
indexComparisonTask := task.Task{
|
||||||
ParentId: []string{taskItem.ID},
|
ParentId: []string{taskItem.ID},
|
||||||
|
|
|
@ -57,6 +57,9 @@ func (p *processor) handleReadyMajorTask(taskItem *task.Task) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
taskItem.RetryTimes++
|
taskItem.RetryTimes++
|
||||||
|
if taskItem.StartTimeInMillis == 0 {
|
||||||
|
taskItem.StartTimeInMillis = time.Now().UnixMilli()
|
||||||
|
}
|
||||||
taskItem.Status = task.StatusRunning
|
taskItem.Status = task.StatusRunning
|
||||||
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||||
Success: true,
|
Success: true,
|
||||||
|
@ -284,7 +287,7 @@ func (p *processor) handleRunningMajorTask(taskItem *task.Task) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor) getMajorTaskState(majorTask *task.Task) (taskState migration_model.MajorTaskState, err error) {
|
func (p *processor) getMajorTaskState(majorTask *task.Task) (taskState migration_model.ClusterMigrationTaskState, err error) {
|
||||||
query := util.MapStr{
|
query := util.MapStr{
|
||||||
"size": 0,
|
"size": 0,
|
||||||
"aggs": util.MapStr{
|
"aggs": util.MapStr{
|
||||||
|
|
|
@ -8,7 +8,6 @@ import (
|
||||||
|
|
||||||
log "github.com/cihub/seelog"
|
log "github.com/cihub/seelog"
|
||||||
|
|
||||||
migration_model "infini.sh/console/plugin/migration/model"
|
|
||||||
migration_util "infini.sh/console/plugin/migration/util"
|
migration_util "infini.sh/console/plugin/migration/util"
|
||||||
|
|
||||||
httprouter "infini.sh/framework/core/api/router"
|
httprouter "infini.sh/framework/core/api/router"
|
||||||
|
@ -88,29 +87,40 @@ func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req
|
||||||
}
|
}
|
||||||
for _, hit := range searchRes.Hits.Hits {
|
for _, hit := range searchRes.Hits.Hits {
|
||||||
sourceM := util.MapStr(hit.Source)
|
sourceM := util.MapStr(hit.Source)
|
||||||
buf := util.MustToJSONBytes(sourceM["config"])
|
h.populateMajorTaskInfo(hit.ID, sourceM)
|
||||||
dataConfig := migration_model.ClusterMigrationTaskConfig{}
|
|
||||||
err = util.FromJSONBytes(buf, &dataConfig)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
//var targetTotalDocs int64
|
|
||||||
if hit.Source["status"] == task.StatusRunning {
|
|
||||||
ts, err := getMajorTaskStatsFromInstances(hit.ID)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("fetch progress info of task error: %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
sourceM.Put("metadata.labels.target_total_docs", ts.IndexDocs)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
h.WriteJSON(w, searchRes, http.StatusOK)
|
h.WriteJSON(w, searchRes, http.StatusOK)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) {
|
||||||
|
buf := util.MustToJSONBytes(sourceM)
|
||||||
|
majorTask := task.Task{}
|
||||||
|
err := util.FromJSONBytes(buf, &majorTask)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to unmarshal major task info, err: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
switch majorTask.Metadata.Type {
|
||||||
|
case "cluster_migration":
|
||||||
|
ts, _, err := h.getMigrationMajorTaskInfo(taskID)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("fetch progress info of task error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sourceM.Put("metadata.labels.target_total_docs", ts.IndexDocs)
|
||||||
|
case "cluster_comparison":
|
||||||
|
ts, _, err := h.getComparisonMajorTaskInfo(taskID)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("fetch progress info of task error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sourceM.Put("metadata.labels.source_scroll_docs", ts.SourceScrollDocs)
|
||||||
|
sourceM.Put("metadata.labels.target_scroll_docs", ts.TargetScrollDocs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
taskID := ps.MustGetParameter("task_id")
|
taskID := ps.MustGetParameter("task_id")
|
||||||
obj := task.Task{}
|
obj := task.Task{}
|
||||||
|
|
|
@ -1,15 +1,19 @@
|
||||||
package migration
|
package migration
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
log "github.com/cihub/seelog"
|
log "github.com/cihub/seelog"
|
||||||
|
|
||||||
migration_model "infini.sh/console/plugin/migration/model"
|
migration_model "infini.sh/console/plugin/migration/model"
|
||||||
|
migration_util "infini.sh/console/plugin/migration/util"
|
||||||
|
|
||||||
"infini.sh/framework/core/api/rbac"
|
"infini.sh/framework/core/api/rbac"
|
||||||
httprouter "infini.sh/framework/core/api/router"
|
httprouter "infini.sh/framework/core/api/router"
|
||||||
"infini.sh/framework/core/elastic"
|
"infini.sh/framework/core/elastic"
|
||||||
|
"infini.sh/framework/core/global"
|
||||||
"infini.sh/framework/core/orm"
|
"infini.sh/framework/core/orm"
|
||||||
"infini.sh/framework/core/task"
|
"infini.sh/framework/core/task"
|
||||||
"infini.sh/framework/core/util"
|
"infini.sh/framework/core/util"
|
||||||
|
@ -38,9 +42,11 @@ func (h *APIHandler) createDataComparisonTask(w http.ResponseWriter, req *http.R
|
||||||
clusterTaskConfig.Creator.Id = user.UserId
|
clusterTaskConfig.Creator.Id = user.UserId
|
||||||
}
|
}
|
||||||
|
|
||||||
var totalDocs int64
|
var sourceTotalDocs int64
|
||||||
|
var targetTotalDocs int64
|
||||||
for _, index := range clusterTaskConfig.Indices {
|
for _, index := range clusterTaskConfig.Indices {
|
||||||
totalDocs += index.Source.Docs
|
sourceTotalDocs += index.Source.Docs
|
||||||
|
targetTotalDocs += index.Target.Docs
|
||||||
}
|
}
|
||||||
|
|
||||||
srcClusterCfg := elastic.GetConfig(clusterTaskConfig.Cluster.Source.Id)
|
srcClusterCfg := elastic.GetConfig(clusterTaskConfig.Cluster.Source.Id)
|
||||||
|
@ -54,7 +60,8 @@ func (h *APIHandler) createDataComparisonTask(w http.ResponseWriter, req *http.R
|
||||||
"business_id": "cluster_comparison",
|
"business_id": "cluster_comparison",
|
||||||
"source_cluster_id": clusterTaskConfig.Cluster.Source.Id,
|
"source_cluster_id": clusterTaskConfig.Cluster.Source.Id,
|
||||||
"target_cluster_id": clusterTaskConfig.Cluster.Target.Id,
|
"target_cluster_id": clusterTaskConfig.Cluster.Target.Id,
|
||||||
"source_total_docs": totalDocs,
|
"source_total_docs": sourceTotalDocs,
|
||||||
|
"target_total_docs": targetTotalDocs,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Cancellable: true,
|
Cancellable: true,
|
||||||
|
@ -71,3 +78,275 @@ func (h *APIHandler) createDataComparisonTask(w http.ResponseWriter, req *http.R
|
||||||
}
|
}
|
||||||
h.WriteCreatedOKJSON(w, t.ID)
|
h.WriteCreatedOKJSON(w, t.ID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
|
id := ps.MustGetParameter("task_id")
|
||||||
|
|
||||||
|
obj := task.Task{}
|
||||||
|
obj.ID = id
|
||||||
|
|
||||||
|
exists, err := orm.Get(&obj)
|
||||||
|
if !exists || err != nil {
|
||||||
|
h.WriteJSON(w, util.MapStr{
|
||||||
|
"_id": id,
|
||||||
|
"found": false,
|
||||||
|
}, http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
taskConfig := &migration_model.ClusterComparisonTaskConfig{}
|
||||||
|
err = migration_util.GetTaskConfig(&obj, taskConfig)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
_, indexState, err := h.getComparisonMajorTaskInfo(id)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to get major task info, err: %v", err)
|
||||||
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var completedIndices int
|
||||||
|
for i, index := range taskConfig.Indices {
|
||||||
|
indexName := index.Source.GetUniqueIndexName()
|
||||||
|
count := indexState[indexName].SourceScrollDocs + indexState[indexName].TargetScrollDocs
|
||||||
|
percent := float64(count) / float64(index.Source.Docs+index.Target.Docs) * 100
|
||||||
|
if percent > 100 {
|
||||||
|
percent = 100
|
||||||
|
}
|
||||||
|
taskConfig.Indices[i].Target.Docs = count
|
||||||
|
taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2)
|
||||||
|
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
|
||||||
|
if count == index.Source.Docs {
|
||||||
|
completedIndices++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cfg := global.MustLookup("cluster_migration_config")
|
||||||
|
if migrationConfig, ok := cfg.(*DispatcherConfig); ok {
|
||||||
|
if obj.Metadata.Labels == nil {
|
||||||
|
obj.Metadata.Labels = util.MapStr{}
|
||||||
|
}
|
||||||
|
obj.Metadata.Labels["log_info"] = util.MapStr{
|
||||||
|
"cluster_id": migrationConfig.Elasticsearch,
|
||||||
|
"index_name": migrationConfig.LogIndexName,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
obj.ConfigString = util.MustToJSON(taskConfig)
|
||||||
|
obj.Metadata.Labels["completed_indices"] = completedIndices
|
||||||
|
h.WriteJSON(w, obj, http.StatusOK)
|
||||||
|
}
|
||||||
|
|
||||||
|
type ComparisonIndexStateInfo struct {
|
||||||
|
ErrorPartitions int
|
||||||
|
SourceScrollDocs int64
|
||||||
|
TargetScrollDocs int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: calc realtime info from instance
|
||||||
|
func (h *APIHandler) getComparisonMajorTaskInfo(majorTaskID string) (taskStats migration_model.ClusterComparisonTaskState, indexState map[string]ComparisonIndexStateInfo, err error) {
|
||||||
|
taskQuery := util.MapStr{
|
||||||
|
"size": 500,
|
||||||
|
"query": util.MapStr{
|
||||||
|
"bool": util.MapStr{
|
||||||
|
"must": []util.MapStr{
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"parent_id": util.MapStr{
|
||||||
|
"value": majorTaskID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"bool": util.MapStr{
|
||||||
|
"minimum_should_match": 1,
|
||||||
|
"should": []util.MapStr{
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"metadata.labels.pipeline_id": util.MapStr{
|
||||||
|
"value": "dump_hash",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"bool": util.MapStr{
|
||||||
|
"must": []util.MapStr{
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"metadata.type": util.MapStr{
|
||||||
|
"value": "index_comparison",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"terms": util.MapStr{
|
||||||
|
"status": []string{task.StatusComplete, task.StatusError},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
q := &orm.Query{
|
||||||
|
RawQuery: util.MustToJSONBytes(taskQuery),
|
||||||
|
}
|
||||||
|
err, result := orm.Search(task.Task{}, q)
|
||||||
|
if err != nil {
|
||||||
|
return taskStats, indexState, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var pipelineIndexNames = map[string]string{}
|
||||||
|
indexState = map[string]ComparisonIndexStateInfo{}
|
||||||
|
for _, row := range result.Result {
|
||||||
|
buf := util.MustToJSONBytes(row)
|
||||||
|
subTask := task.Task{}
|
||||||
|
err := util.FromJSONBytes(buf, &subTask)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to unmarshal task, err: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if subTask.Metadata.Labels == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
taskLabels := util.MapStr(subTask.Metadata.Labels)
|
||||||
|
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
|
||||||
|
if indexName == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// add indexDocs of already complete/error
|
||||||
|
if subTask.Metadata.Type == "index_comparison" {
|
||||||
|
sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled")
|
||||||
|
targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled")
|
||||||
|
taskStats.SourceScrollDocs += sourceDocs
|
||||||
|
taskStats.TargetScrollDocs += targetDocs
|
||||||
|
st := indexState[indexName]
|
||||||
|
st.SourceScrollDocs += sourceDocs
|
||||||
|
st.TargetScrollDocs += targetDocs
|
||||||
|
if subTask.Status == task.StatusError {
|
||||||
|
st.ErrorPartitions += 1
|
||||||
|
}
|
||||||
|
indexState[indexName] = st
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
pipelineIndexNames[subTask.ID] = indexName
|
||||||
|
}
|
||||||
|
|
||||||
|
return taskStats, indexState, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
|
id := ps.MustGetParameter("task_id")
|
||||||
|
uniqueIndexName := ps.MustGetParameter("index")
|
||||||
|
majorTask := task.Task{}
|
||||||
|
majorTask.ID = id
|
||||||
|
exists, err := orm.Get(&majorTask)
|
||||||
|
if !exists || err != nil {
|
||||||
|
h.WriteError(w, fmt.Sprintf("task [%s] not found", id), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
taskInfo := &TaskInfoResponse{
|
||||||
|
TaskID: id,
|
||||||
|
StartTime: majorTask.StartTimeInMillis,
|
||||||
|
}
|
||||||
|
|
||||||
|
subTasks, pipelineTaskIDs, _, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(&majorTask, uniqueIndexName)
|
||||||
|
|
||||||
|
taskInfo.DataPartition = len(subTasks)
|
||||||
|
if len(subTasks) == 0 {
|
||||||
|
h.WriteJSON(w, taskInfo, http.StatusOK)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs)
|
||||||
|
startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks)
|
||||||
|
|
||||||
|
var partitionTaskInfos []util.MapStr
|
||||||
|
|
||||||
|
for i, subTask := range subTasks {
|
||||||
|
cfg := migration_model.IndexComparisonTaskConfig{}
|
||||||
|
err := migration_util.GetTaskConfig(&subTask, &cfg)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to get task config, err: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if i == 0 {
|
||||||
|
taskInfo.Step = cfg.Source.Step
|
||||||
|
}
|
||||||
|
|
||||||
|
var durationInMS int64
|
||||||
|
var subCompletedTime int64
|
||||||
|
if subTask.StartTimeInMillis > 0 {
|
||||||
|
if migration_util.IsPendingState(subTask.Status) {
|
||||||
|
durationInMS = time.Now().UnixMilli() - subTask.StartTimeInMillis
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if subTask.CompletedTime != nil {
|
||||||
|
subCompletedTime = subTask.CompletedTime.UnixMilli()
|
||||||
|
durationInMS = subCompletedTime - subTask.StartTimeInMillis
|
||||||
|
}
|
||||||
|
}
|
||||||
|
subTaskLabels := util.MapStr(subTask.Metadata.Labels)
|
||||||
|
sourceScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "source_scrolled")
|
||||||
|
targetScrollDocs := migration_util.GetMapIntValue(subTaskLabels, "target_scrolled")
|
||||||
|
|
||||||
|
partitionTaskInfo := util.MapStr{
|
||||||
|
"task_id": subTask.ID,
|
||||||
|
"status": subTask.Status,
|
||||||
|
"start_time": subTask.StartTimeInMillis,
|
||||||
|
"completed_time": subCompletedTime,
|
||||||
|
"start": cfg.Source.Start,
|
||||||
|
"end": cfg.Source.End,
|
||||||
|
"duration": durationInMS,
|
||||||
|
"source_total_docs": cfg.Source.DocCount,
|
||||||
|
"target_total_docs": cfg.Target.DocCount,
|
||||||
|
}
|
||||||
|
sourceDumpTask, targetDumpTask, _ := migration_util.SplitIndexComparisonTasks(parentIDPipelineTasks[subTask.ID], &cfg)
|
||||||
|
if sourceDumpTask != nil {
|
||||||
|
partitionTaskInfo["source_scroll_task"] = util.MapStr{
|
||||||
|
"id": sourceDumpTask.ID,
|
||||||
|
"status": sourceDumpTask.Status,
|
||||||
|
}
|
||||||
|
pipelineID := sourceDumpTask.ID
|
||||||
|
pipelineContext, ok := pipelineContexts[pipelineID]
|
||||||
|
if ok {
|
||||||
|
if vv := migration_util.GetMapIntValue(pipelineContext, "dump_hash.scrolled_docs"); vv > 0 {
|
||||||
|
sourceScrollDocs = vv
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if targetDumpTask != nil {
|
||||||
|
partitionTaskInfo["target_scroll_task"] = util.MapStr{
|
||||||
|
"id": targetDumpTask.ID,
|
||||||
|
"status": targetDumpTask.Status,
|
||||||
|
}
|
||||||
|
pipelineID := targetDumpTask.ID
|
||||||
|
pipelineContext, ok := pipelineContexts[pipelineID]
|
||||||
|
if ok {
|
||||||
|
if vv := migration_util.GetMapIntValue(pipelineContext, "dump_hash.scrolled_docs"); vv > 0 {
|
||||||
|
targetScrollDocs = vv
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
partitionTaskInfo["source_scroll_docs"] = sourceScrollDocs
|
||||||
|
partitionTaskInfo["target_scroll_docs"] = targetScrollDocs
|
||||||
|
partitionTaskInfos = append(partitionTaskInfos, partitionTaskInfo)
|
||||||
|
}
|
||||||
|
taskInfo.CompletedTime = completedTime
|
||||||
|
taskInfo.Duration = duration
|
||||||
|
// NOTE: overwrite major task start time with the first started sub task
|
||||||
|
if taskInfo.StartTime == 0 {
|
||||||
|
taskInfo.StartTime = startTime
|
||||||
|
}
|
||||||
|
taskInfo.Partitions = partitionTaskInfos
|
||||||
|
taskInfo.CompletedPartitions = completedPartitions
|
||||||
|
h.WriteJSON(w, taskInfo, http.StatusOK)
|
||||||
|
}
|
||||||
|
|
|
@ -293,6 +293,7 @@ func (p *processor) handleScheduleSubTask(taskItem *task.Task) error {
|
||||||
|
|
||||||
// update sub migration task status to running and save task log
|
// update sub migration task status to running and save task log
|
||||||
taskItem.Metadata.Labels["execution_instance_id"] = instanceID
|
taskItem.Metadata.Labels["execution_instance_id"] = instanceID
|
||||||
|
p.clearTaskState(taskItem)
|
||||||
taskItem.Status = task.StatusRunning
|
taskItem.Status = task.StatusRunning
|
||||||
taskItem.StartTimeInMillis = time.Now().UnixMilli()
|
taskItem.StartTimeInMillis = time.Now().UnixMilli()
|
||||||
|
|
||||||
|
@ -330,6 +331,9 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
|
||||||
if sourceDumpTask.Status == task.StatusComplete && targetDumpTask.Status == task.StatusComplete {
|
if sourceDumpTask.Status == task.StatusComplete && targetDumpTask.Status == task.StatusComplete {
|
||||||
sourceDocs := migration_util.GetMapIntValue(util.MapStr(sourceDumpTask.Metadata.Labels), "scrolled_docs")
|
sourceDocs := migration_util.GetMapIntValue(util.MapStr(sourceDumpTask.Metadata.Labels), "scrolled_docs")
|
||||||
targetDocs := migration_util.GetMapIntValue(util.MapStr(targetDumpTask.Metadata.Labels), "scrolled_docs")
|
targetDocs := migration_util.GetMapIntValue(util.MapStr(targetDumpTask.Metadata.Labels), "scrolled_docs")
|
||||||
|
|
||||||
|
taskItem.Metadata.Labels["source_scrolled"] = sourceDocs
|
||||||
|
taskItem.Metadata.Labels["target_scrolled"] = targetDocs
|
||||||
if sourceDocs != targetDocs {
|
if sourceDocs != targetDocs {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
taskItem.CompletedTime = &now
|
taskItem.CompletedTime = &now
|
||||||
|
@ -424,19 +428,7 @@ func (p *processor) getPipelineTasks(taskItem *task.Task, cfg *migration_model.I
|
||||||
err = fmt.Errorf("invalid pipeline task count: %d", len(ptasks))
|
err = fmt.Errorf("invalid pipeline task count: %d", len(ptasks))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for i, ptask := range ptasks {
|
sourceDumpTask, targetDumpTask, diffTask = migration_util.SplitIndexComparisonTasks(ptasks, cfg)
|
||||||
if ptask.Metadata.Labels["pipeline_id"] == "dump_hash" {
|
|
||||||
// TODO: we can't handle when compare the same cluster & same index
|
|
||||||
// catch it earlier when creating the task
|
|
||||||
if ptask.Metadata.Labels["cluster_id"] == cfg.Source.ClusterId && ptask.Metadata.Labels["index_name"] == cfg.Source.Indices {
|
|
||||||
sourceDumpTask = &ptasks[i]
|
|
||||||
} else {
|
|
||||||
targetDumpTask = &ptasks[i]
|
|
||||||
}
|
|
||||||
} else if ptask.Metadata.Labels["pipeline_id"] == "index_diff" {
|
|
||||||
diffTask = &ptasks[i]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -478,3 +470,8 @@ func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.Ta
|
||||||
migration_util.WriteLog(taskItem, taskResult, message)
|
migration_util.WriteLog(taskItem, taskResult, message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *processor) clearTaskState(taskItem *task.Task) {
|
||||||
|
delete(taskItem.Metadata.Labels, "source_scrolled")
|
||||||
|
delete(taskItem.Metadata.Labels, "target_scrolled")
|
||||||
|
}
|
||||||
|
|
|
@ -306,8 +306,7 @@ func (p *processor) handleScheduleSubTask(taskItem *task.Task) error {
|
||||||
|
|
||||||
// update sub migration task status to running and save task log
|
// update sub migration task status to running and save task log
|
||||||
taskItem.Metadata.Labels["execution_instance_id"] = instanceID
|
taskItem.Metadata.Labels["execution_instance_id"] = instanceID
|
||||||
taskItem.Metadata.Labels["index_docs"] = 0
|
p.clearTaskState(taskItem)
|
||||||
taskItem.Metadata.Labels["scrolled_docs"] = 0
|
|
||||||
taskItem.Status = task.StatusRunning
|
taskItem.Status = task.StatusRunning
|
||||||
taskItem.StartTimeInMillis = time.Now().UnixMilli()
|
taskItem.StartTimeInMillis = time.Now().UnixMilli()
|
||||||
|
|
||||||
|
@ -331,8 +330,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
|
||||||
|
|
||||||
if totalDocs == 0 {
|
if totalDocs == 0 {
|
||||||
taskItem.Status = task.StatusComplete
|
taskItem.Status = task.StatusComplete
|
||||||
taskItem.Metadata.Labels["scrolled_docs"] = 0
|
p.clearTaskState(taskItem)
|
||||||
taskItem.Metadata.Labels["index_docs"] = 0
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
taskItem.CompletedTime = &now
|
taskItem.CompletedTime = &now
|
||||||
|
|
||||||
|
@ -498,13 +496,7 @@ func (p *processor) getScrollBulkPipelineTasks(taskItem *task.Task) (scrollTask
|
||||||
err = fmt.Errorf("invalid pipeline task count: %d", len(ptasks))
|
err = fmt.Errorf("invalid pipeline task count: %d", len(ptasks))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for i, ptask := range ptasks {
|
scrollTask, bulkTask = migration_util.SplitIndexMigrationTasks(ptasks)
|
||||||
if ptask.Metadata.Labels["pipeline_id"] == "bulk_indexing" {
|
|
||||||
bulkTask = &ptasks[i]
|
|
||||||
} else if ptask.Metadata.Labels["pipeline_id"] == "es_scroll" {
|
|
||||||
scrollTask = &ptasks[i]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -571,3 +563,8 @@ func (p *processor) saveTaskAndWriteLog(taskItem *task.Task, taskResult *task.Ta
|
||||||
migration_util.WriteLog(taskItem, taskResult, message)
|
migration_util.WriteLog(taskItem, taskResult, message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p *processor) clearTaskState(taskItem *task.Task) {
|
||||||
|
delete(taskItem.Metadata.Labels, "index_docs")
|
||||||
|
delete(taskItem.Metadata.Labels, "scrolled_docs")
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,132 @@
|
||||||
|
package migration
|
||||||
|
|
||||||
|
import (
|
||||||
|
log "github.com/cihub/seelog"
|
||||||
|
|
||||||
|
migration_model "infini.sh/console/plugin/migration/model"
|
||||||
|
migration_util "infini.sh/console/plugin/migration/util"
|
||||||
|
|
||||||
|
"infini.sh/framework/core/orm"
|
||||||
|
"infini.sh/framework/core/task"
|
||||||
|
"infini.sh/framework/core/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MigrationIndexStateInfo struct {
|
||||||
|
ErrorPartitions int
|
||||||
|
IndexDocs int64
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
We count data from two sources:
|
||||||
|
- index_migrations with complete/error status
|
||||||
|
- plus index_migration.index_docs with realtime bulk indexing info
|
||||||
|
- realtime bulk indexing info is only available for running index_migrations
|
||||||
|
*/
|
||||||
|
func (h *APIHandler) getMigrationMajorTaskInfo(majorTaskID string) (taskStats migration_model.ClusterMigrationTaskState, indexState map[string]MigrationIndexStateInfo, err error) {
|
||||||
|
taskQuery := util.MapStr{
|
||||||
|
"size": 500,
|
||||||
|
"query": util.MapStr{
|
||||||
|
"bool": util.MapStr{
|
||||||
|
"must": []util.MapStr{
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"parent_id": util.MapStr{
|
||||||
|
"value": majorTaskID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"bool": util.MapStr{
|
||||||
|
"minimum_should_match": 1,
|
||||||
|
"should": []util.MapStr{
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"metadata.labels.pipeline_id": util.MapStr{
|
||||||
|
"value": "bulk_indexing",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"bool": util.MapStr{
|
||||||
|
"must": []util.MapStr{
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"metadata.type": util.MapStr{
|
||||||
|
"value": "index_migration",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"terms": util.MapStr{
|
||||||
|
"status": []string{task.StatusComplete, task.StatusError},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
q := &orm.Query{
|
||||||
|
RawQuery: util.MustToJSONBytes(taskQuery),
|
||||||
|
}
|
||||||
|
err, result := orm.Search(task.Task{}, q)
|
||||||
|
if err != nil {
|
||||||
|
return taskStats, indexState, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var pipelineTaskIDs = map[string][]string{}
|
||||||
|
var pipelineIndexNames = map[string]string{}
|
||||||
|
indexState = map[string]MigrationIndexStateInfo{}
|
||||||
|
for _, row := range result.Result {
|
||||||
|
buf := util.MustToJSONBytes(row)
|
||||||
|
subTask := task.Task{}
|
||||||
|
err := util.FromJSONBytes(buf, &subTask)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to unmarshal task, err: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if subTask.Metadata.Labels == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
taskLabels := util.MapStr(subTask.Metadata.Labels)
|
||||||
|
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
|
||||||
|
if indexName == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// add indexDocs of already complete/error
|
||||||
|
if subTask.Metadata.Type == "index_migration" {
|
||||||
|
indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs")
|
||||||
|
taskStats.IndexDocs += indexDocs
|
||||||
|
st := indexState[indexName]
|
||||||
|
st.IndexDocs += indexDocs
|
||||||
|
if subTask.Status == task.StatusError {
|
||||||
|
st.ErrorPartitions += 1
|
||||||
|
}
|
||||||
|
indexState[indexName] = st
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
pipelineIndexNames[subTask.ID] = indexName
|
||||||
|
|
||||||
|
if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" {
|
||||||
|
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs)
|
||||||
|
for pipelineID, pipelineContext := range pipelineContexts {
|
||||||
|
// add indexDocs of running tasks
|
||||||
|
indexDocs := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count")
|
||||||
|
taskStats.IndexDocs += indexDocs
|
||||||
|
indexName := pipelineIndexNames[pipelineID]
|
||||||
|
st := indexState[indexName]
|
||||||
|
st.IndexDocs += indexDocs
|
||||||
|
indexState[indexName] = st
|
||||||
|
}
|
||||||
|
return taskStats, indexState, nil
|
||||||
|
}
|
|
@ -28,6 +28,7 @@ type IndexPartition struct {
|
||||||
type IndexInfo struct {
|
type IndexInfo struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
DocType string `json:"doc_type"`
|
DocType string `json:"doc_type"`
|
||||||
|
// NOTE: == 0 for migration target index
|
||||||
Docs int64 `json:"docs"`
|
Docs int64 `json:"docs"`
|
||||||
StoreSizeInBytes int `json:"store_size_in_bytes"`
|
StoreSizeInBytes int `json:"store_size_in_bytes"`
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,10 +26,16 @@ type ClusterComparisonIndexConfig struct {
|
||||||
Partition *IndexPartition `json:"partition,omitempty"`
|
Partition *IndexPartition `json:"partition,omitempty"`
|
||||||
|
|
||||||
// only used in API
|
// only used in API
|
||||||
Percent float64 `json:"percent,omitempty"`
|
ScrollPercent float64 `json:"scroll_percent,omitempty"`
|
||||||
ErrorPartitions int `json:"error_partitions,omitempty"`
|
ErrorPartitions int `json:"error_partitions,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ClusterComparisonTaskState struct {
|
||||||
|
SourceScrollDocs int64
|
||||||
|
TargetScrollDocs int64
|
||||||
|
Status string
|
||||||
|
}
|
||||||
|
|
||||||
type IndexComparisonTaskConfig struct {
|
type IndexComparisonTaskConfig struct {
|
||||||
Source IndexComparisonDumpConfig `json:"source"`
|
Source IndexComparisonDumpConfig `json:"source"`
|
||||||
Target IndexComparisonDumpConfig `json:"target"`
|
Target IndexComparisonDumpConfig `json:"target"`
|
||||||
|
@ -47,8 +53,13 @@ type IndexComparisonDumpConfig struct {
|
||||||
ScrollTime string `json:"scroll_time"`
|
ScrollTime string `json:"scroll_time"`
|
||||||
QueryString string `json:"query_string,omitempty"`
|
QueryString string `json:"query_string,omitempty"`
|
||||||
QueryDSL util.MapStr `json:"query_dsl,omitempty"`
|
QueryDSL util.MapStr `json:"query_dsl,omitempty"`
|
||||||
|
|
||||||
DocCount int64 `json:"doc_count"`
|
DocCount int64 `json:"doc_count"`
|
||||||
|
|
||||||
|
// Only populated for partitioned tasks
|
||||||
|
Start float64 `json:"start"`
|
||||||
|
End float64 `json:"end"`
|
||||||
|
Step interface{} `jsno:"step"`
|
||||||
|
PartitionId int `json:"partition_id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type IndexComparisonDiffConfig struct {
|
type IndexComparisonDiffConfig struct {
|
||||||
|
|
|
@ -38,16 +38,11 @@ type ClusterMigrationIndexConfig struct {
|
||||||
ErrorPartitions int `json:"error_partitions,omitempty"`
|
ErrorPartitions int `json:"error_partitions,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type MajorTaskState struct {
|
type ClusterMigrationTaskState struct {
|
||||||
IndexDocs int64
|
IndexDocs int64
|
||||||
Status string
|
Status string
|
||||||
}
|
}
|
||||||
|
|
||||||
type IndexStateInfo struct {
|
|
||||||
ErrorPartitions int
|
|
||||||
IndexDocs float64
|
|
||||||
}
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
IndexMigrationV0 = 0
|
IndexMigrationV0 = 0
|
||||||
IndexMigrationV1 = 1
|
IndexMigrationV1 = 1
|
||||||
|
@ -70,11 +65,11 @@ type IndexMigrationSourceConfig struct {
|
||||||
TypeRename util.MapStr `json:"type_rename,omitempty"`
|
TypeRename util.MapStr `json:"type_rename,omitempty"`
|
||||||
QueryString string `json:"query_string,omitempty"`
|
QueryString string `json:"query_string,omitempty"`
|
||||||
QueryDSL util.MapStr `json:"query_dsl,omitempty"`
|
QueryDSL util.MapStr `json:"query_dsl,omitempty"`
|
||||||
|
DocCount int64 `json:"doc_count"`
|
||||||
|
|
||||||
// Parition configs
|
// Parition configs
|
||||||
Start float64 `json:"start"`
|
Start float64 `json:"start"`
|
||||||
End float64 `json:"end"`
|
End float64 `json:"end"`
|
||||||
DocCount int64 `json:"doc_count"`
|
|
||||||
Step interface{} `json:"step"`
|
Step interface{} `json:"step"`
|
||||||
PartitionId int `json:"partition_id"`
|
PartitionId int `json:"partition_id"`
|
||||||
}
|
}
|
|
@ -0,0 +1,38 @@
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
migration_model "infini.sh/console/plugin/migration/model"
|
||||||
|
"infini.sh/framework/core/task"
|
||||||
|
)
|
||||||
|
|
||||||
|
/*
|
||||||
|
These functions could return nil tasks
|
||||||
|
*/
|
||||||
|
|
||||||
|
func SplitIndexMigrationTasks(ptasks []task.Task) (scrollTask *task.Task, bulkTask *task.Task) {
|
||||||
|
for i, ptask := range ptasks {
|
||||||
|
if ptask.Metadata.Labels["pipeline_id"] == "bulk_indexing" {
|
||||||
|
bulkTask = &ptasks[i]
|
||||||
|
} else if ptask.Metadata.Labels["pipeline_id"] == "es_scroll" {
|
||||||
|
scrollTask = &ptasks[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func SplitIndexComparisonTasks(ptasks []task.Task, cfg *migration_model.IndexComparisonTaskConfig) (sourceDumpTask *task.Task, targetDumpTask *task.Task, diffTask *task.Task) {
|
||||||
|
for i, ptask := range ptasks {
|
||||||
|
if ptask.Metadata.Labels["pipeline_id"] == "dump_hash" {
|
||||||
|
// TODO: we can't handle when compare the same cluster & same index
|
||||||
|
// catch it earlier when creating the task
|
||||||
|
if ptask.Metadata.Labels["cluster_id"] == cfg.Source.ClusterId && ptask.Metadata.Labels["index_name"] == cfg.Source.Indices {
|
||||||
|
sourceDumpTask = &ptasks[i]
|
||||||
|
} else {
|
||||||
|
targetDumpTask = &ptasks[i]
|
||||||
|
}
|
||||||
|
} else if ptask.Metadata.Labels["pipeline_id"] == "index_diff" {
|
||||||
|
diffTask = &ptasks[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
|
@ -41,6 +41,12 @@ func IsRunningState(status string) bool {
|
||||||
return util.StringInArray(runningTaskStatus, status)
|
return util.StringInArray(runningTaskStatus, status)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var pendingTaskStatus = []string{task.StatusRunning, task.StatusReady, task.StatusPendingStop}
|
||||||
|
|
||||||
|
func IsPendingState(status string) bool {
|
||||||
|
return util.StringInArray(pendingTaskStatus, status)
|
||||||
|
}
|
||||||
|
|
||||||
func GetTaskConfig(task *task.Task, config interface{}) error {
|
func GetTaskConfig(task *task.Task, config interface{}) error {
|
||||||
if task.Config_ == nil {
|
if task.Config_ == nil {
|
||||||
return util.FromJSONBytes([]byte(task.ConfigString), config)
|
return util.FromJSONBytes([]byte(task.ConfigString), config)
|
||||||
|
|
Loading…
Reference in New Issue