377 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
			
		
		
	
	
			377 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
package task_manager
 | 
						|
 | 
						|
import (
 | 
						|
	"fmt"
 | 
						|
	"net/http"
 | 
						|
	"time"
 | 
						|
 | 
						|
	log "github.com/cihub/seelog"
 | 
						|
 | 
						|
	"infini.sh/console/plugin/task_manager/cluster_migration"
 | 
						|
	migration_model "infini.sh/console/plugin/task_manager/model"
 | 
						|
	migration_util "infini.sh/console/plugin/task_manager/util"
 | 
						|
 | 
						|
	"infini.sh/framework/core/api/rbac"
 | 
						|
	httprouter "infini.sh/framework/core/api/router"
 | 
						|
	"infini.sh/framework/core/global"
 | 
						|
	"infini.sh/framework/core/orm"
 | 
						|
	"infini.sh/framework/core/task"
 | 
						|
	"infini.sh/framework/core/util"
 | 
						|
)
 | 
						|
 | 
						|
func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
 | 
						|
	clusterTaskConfig := &migration_model.ClusterMigrationTaskConfig{}
 | 
						|
	err := h.DecodeJSON(req, clusterTaskConfig)
 | 
						|
	if err != nil {
 | 
						|
		log.Error(err)
 | 
						|
		h.WriteError(w, err.Error(), http.StatusInternalServerError)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	user, err := rbac.FromUserContext(req.Context())
 | 
						|
	if err != nil {
 | 
						|
		log.Error(err)
 | 
						|
		h.WriteError(w, err.Error(), http.StatusInternalServerError)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	t, err := cluster_migration.CreateTask(clusterTaskConfig, user)
 | 
						|
	if err != nil {
 | 
						|
		log.Error(err)
 | 
						|
		h.WriteError(w, err.Error(), http.StatusInternalServerError)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	h.WriteCreatedOKJSON(w, t.ID)
 | 
						|
}
 | 
						|
 | 
						|
func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
 | 
						|
	id := ps.MustGetParameter("task_id")
 | 
						|
 | 
						|
	obj := task.Task{}
 | 
						|
	obj.ID = id
 | 
						|
 | 
						|
	exists, err := orm.Get(&obj)
 | 
						|
	if !exists || err != nil {
 | 
						|
		h.WriteJSON(w, util.MapStr{
 | 
						|
			"_id":   id,
 | 
						|
			"found": false,
 | 
						|
		}, http.StatusNotFound)
 | 
						|
		return
 | 
						|
	}
 | 
						|
	taskConfig := &migration_model.ClusterMigrationTaskConfig{}
 | 
						|
	err = migration_util.GetTaskConfig(&obj, taskConfig)
 | 
						|
	if err != nil {
 | 
						|
		log.Error(err)
 | 
						|
		h.WriteError(w, err.Error(), http.StatusInternalServerError)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	_, indexState, err := h.getMigrationMajorTaskInfo(id)
 | 
						|
	if err != nil {
 | 
						|
		log.Errorf("failed to get major task info, err: %v", err)
 | 
						|
		h.WriteError(w, err.Error(), http.StatusInternalServerError)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	var completedIndices int
 | 
						|
	for i, index := range taskConfig.Indices {
 | 
						|
		indexName := index.Source.GetUniqueIndexName()
 | 
						|
		count := indexState[indexName].IndexDocs
 | 
						|
		percent := float64(count) / float64(index.Source.Docs) * 100
 | 
						|
		if percent > 100 {
 | 
						|
			percent = 100
 | 
						|
		}
 | 
						|
		taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceDocs
 | 
						|
		taskConfig.Indices[i].Target.Docs = count
 | 
						|
		taskConfig.Indices[i].Percent = util.ToFixed(percent, 2)
 | 
						|
		taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
 | 
						|
		if count == index.Source.Docs {
 | 
						|
			completedIndices++
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	cfg := global.MustLookup("cluster_migration_config")
 | 
						|
	if migrationConfig, ok := cfg.(*DispatcherConfig); ok {
 | 
						|
		if obj.Metadata.Labels == nil {
 | 
						|
			obj.Metadata.Labels = util.MapStr{}
 | 
						|
		}
 | 
						|
		obj.Metadata.Labels["log_info"] = util.MapStr{
 | 
						|
			"cluster_id": migrationConfig.Elasticsearch,
 | 
						|
			"index_name": migrationConfig.LogIndexName,
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	_, repeatStatus, err := h.calcRepeatingStatus(&obj)
 | 
						|
	if err != nil {
 | 
						|
		log.Warnf("failed to calc repeat info, err: %v", err)
 | 
						|
	}
 | 
						|
	obj.Metadata.Labels["repeat"] = repeatStatus
 | 
						|
 | 
						|
	obj.ConfigString = util.MustToJSON(taskConfig)
 | 
						|
	obj.Metadata.Labels["completed_indices"] = completedIndices
 | 
						|
	h.WriteJSON(w, obj, http.StatusOK)
 | 
						|
}
 | 
						|
 | 
						|
func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
 | 
						|
	id := ps.MustGetParameter("task_id")
 | 
						|
	uniqueIndexName := ps.MustGetParameter("index")
 | 
						|
	majorTask := task.Task{}
 | 
						|
	majorTask.ID = id
 | 
						|
	exists, err := orm.Get(&majorTask)
 | 
						|
	if !exists || err != nil {
 | 
						|
		h.WriteError(w, fmt.Sprintf("task [%s] not found", id), http.StatusInternalServerError)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	taskConfig := &migration_model.ClusterMigrationTaskConfig{}
 | 
						|
	err = migration_util.GetTaskConfig(&majorTask, taskConfig)
 | 
						|
	if err != nil {
 | 
						|
		log.Error(err)
 | 
						|
		h.WriteError(w, err.Error(), http.StatusInternalServerError)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	taskInfo := &TaskInfoResponse{
 | 
						|
		TaskID:    id,
 | 
						|
		StartTime: majorTask.StartTimeInMillis,
 | 
						|
		Repeating: migration_util.IsRepeating(taskConfig.Settings.Execution.Repeat, majorTask.Metadata.Labels),
 | 
						|
	}
 | 
						|
 | 
						|
	subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(id, uniqueIndexName)
 | 
						|
 | 
						|
	taskInfo.DataPartition = len(subTasks)
 | 
						|
	if len(subTasks) == 0 {
 | 
						|
		h.WriteJSON(w, taskInfo, http.StatusOK)
 | 
						|
		return
 | 
						|
	}
 | 
						|
 | 
						|
	var scrollStats = map[string]int64{}
 | 
						|
	var bulkStats = map[string]int64{}
 | 
						|
	pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs)
 | 
						|
	for pipelineID, pipelineContext := range pipelineContexts {
 | 
						|
		if pid, ok := pipelineSubParentIDs[pipelineID]; ok {
 | 
						|
			if vv := migration_util.GetMapIntValue(pipelineContext, "es_scroll.scrolled_docs"); vv > 0 {
 | 
						|
				scrollStats[pid] = vv
 | 
						|
			}
 | 
						|
			if vv := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count"); vv > 0 {
 | 
						|
				bulkStats[pid] = vv
 | 
						|
			}
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating)
 | 
						|
 | 
						|
	var partitionTaskInfos []util.MapStr
 | 
						|
 | 
						|
	for i, ptask := range subTasks {
 | 
						|
		cfg := migration_model.IndexMigrationTaskConfig{}
 | 
						|
		err := migration_util.GetTaskConfig(&ptask, &cfg)
 | 
						|
		if err != nil {
 | 
						|
			log.Errorf("failed to get task config, err: %v", err)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
		if i == 0 {
 | 
						|
			taskInfo.Step = cfg.Source.Step
 | 
						|
		}
 | 
						|
 | 
						|
		var durationInMS int64
 | 
						|
		var subCompletedTime int64
 | 
						|
		if ptask.StartTimeInMillis > 0 {
 | 
						|
			if migration_util.IsPendingState(ptask.Status) {
 | 
						|
				durationInMS = time.Now().UnixMilli() - ptask.StartTimeInMillis
 | 
						|
			} else if ptask.CompletedTime != nil {
 | 
						|
				subCompletedTime = ptask.CompletedTime.UnixMilli()
 | 
						|
				durationInMS = subCompletedTime - ptask.StartTimeInMillis
 | 
						|
			}
 | 
						|
		}
 | 
						|
		var (
 | 
						|
			scrollDocs int64
 | 
						|
			indexDocs  int64
 | 
						|
		)
 | 
						|
		ptaskLabels := util.MapStr(ptask.Metadata.Labels)
 | 
						|
		if vv, ok := scrollStats[ptask.ID]; ok {
 | 
						|
			scrollDocs = vv
 | 
						|
		} else {
 | 
						|
			scrollDocs = migration_util.GetMapIntValue(ptaskLabels, "scrolled_docs")
 | 
						|
		}
 | 
						|
		if vv, ok := bulkStats[ptask.ID]; ok {
 | 
						|
			indexDocs = vv
 | 
						|
		} else {
 | 
						|
			indexDocs = migration_util.GetMapIntValue(ptaskLabels, "index_docs")
 | 
						|
		}
 | 
						|
 | 
						|
		partitionTotalDocs := cfg.Source.DocCount
 | 
						|
		partitionTaskInfo := util.MapStr{
 | 
						|
			"task_id":        ptask.ID,
 | 
						|
			"status":         ptask.Status,
 | 
						|
			"start_time":     ptask.StartTimeInMillis,
 | 
						|
			"completed_time": subCompletedTime,
 | 
						|
			"start":          cfg.Source.Start,
 | 
						|
			"end":            cfg.Source.End,
 | 
						|
			"duration":       durationInMS,
 | 
						|
			"scroll_docs":    scrollDocs,
 | 
						|
			"index_docs":     indexDocs,
 | 
						|
			"total_docs":     partitionTotalDocs,
 | 
						|
		}
 | 
						|
		scrollTask, bulkTask := migration_util.SplitIndexMigrationTasks(parentIDPipelineTasks[ptask.ID])
 | 
						|
		if scrollTask != nil {
 | 
						|
			partitionTaskInfo["scroll_task"] = util.MapStr{
 | 
						|
				"id":     scrollTask.ID,
 | 
						|
				"status": scrollTask.Status,
 | 
						|
			}
 | 
						|
		}
 | 
						|
		if bulkTask != nil {
 | 
						|
			partitionTaskInfo["bulk_task"] = util.MapStr{
 | 
						|
				"id":     bulkTask.ID,
 | 
						|
				"status": bulkTask.Status,
 | 
						|
			}
 | 
						|
		}
 | 
						|
		partitionTaskInfos = append(partitionTaskInfos, partitionTaskInfo)
 | 
						|
	}
 | 
						|
	taskInfo.CompletedTime = completedTime
 | 
						|
	taskInfo.Duration = duration
 | 
						|
	// NOTE: overwrite major task start time with the first started sub task
 | 
						|
	if taskInfo.StartTime == 0 {
 | 
						|
		taskInfo.StartTime = startTime
 | 
						|
	}
 | 
						|
	taskInfo.Partitions = partitionTaskInfos
 | 
						|
	taskInfo.CompletedPartitions = completedPartitions
 | 
						|
	h.WriteJSON(w, taskInfo, http.StatusOK)
 | 
						|
}
 | 
						|
 | 
						|
type MigrationIndexStateInfo struct {
 | 
						|
	ErrorPartitions int
 | 
						|
	IndexDocs       int64
 | 
						|
	SourceDocs      int64
 | 
						|
}
 | 
						|
 | 
						|
/*
 | 
						|
We count data from two sources:
 | 
						|
  - index_migrations with complete/error status
 | 
						|
  - plus index_migration.index_docs with realtime bulk indexing info
 | 
						|
  - realtime bulk indexing info is only available for running index_migrations
 | 
						|
*/
 | 
						|
func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_model.ClusterMigrationTaskState, indexState map[string]MigrationIndexStateInfo, err error) {
 | 
						|
	taskQuery := util.MapStr{
 | 
						|
		"size": 500,
 | 
						|
		"query": util.MapStr{
 | 
						|
			"bool": util.MapStr{
 | 
						|
				"must": []util.MapStr{
 | 
						|
					{
 | 
						|
						"term": util.MapStr{
 | 
						|
							"parent_id": util.MapStr{
 | 
						|
								"value": id,
 | 
						|
							},
 | 
						|
						},
 | 
						|
					},
 | 
						|
					{
 | 
						|
						"term": util.MapStr{
 | 
						|
							"metadata.type": util.MapStr{
 | 
						|
								"value": "index_migration",
 | 
						|
							},
 | 
						|
						},
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
	subTasks, err := migration_util.GetTasks(taskQuery)
 | 
						|
	if err != nil {
 | 
						|
		return taskStats, indexState, err
 | 
						|
	}
 | 
						|
 | 
						|
	var indexMigrationTaskIDs []string
 | 
						|
	indexState = map[string]MigrationIndexStateInfo{}
 | 
						|
	for _, subTask := range subTasks {
 | 
						|
		taskLabels := util.MapStr(subTask.Metadata.Labels)
 | 
						|
		indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
 | 
						|
		if indexName == "" {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		cfg := migration_model.IndexMigrationTaskConfig{}
 | 
						|
		err = migration_util.GetTaskConfig(&subTask, &cfg)
 | 
						|
		if err != nil {
 | 
						|
			log.Errorf("failed to get task config, err: %v", err)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		taskStats.SourceDocs += cfg.Source.DocCount
 | 
						|
		st := indexState[indexName]
 | 
						|
		st.SourceDocs += cfg.Source.DocCount
 | 
						|
		indexState[indexName] = st
 | 
						|
 | 
						|
		if subTask.Status == task.StatusRunning {
 | 
						|
			indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID)
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs")
 | 
						|
		taskStats.IndexDocs += indexDocs
 | 
						|
		st.IndexDocs += indexDocs
 | 
						|
		if subTask.Status == task.StatusError {
 | 
						|
			st.ErrorPartitions += 1
 | 
						|
			taskStats.ErrorPartitions += 1
 | 
						|
		}
 | 
						|
		indexState[indexName] = st
 | 
						|
		indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID)
 | 
						|
	}
 | 
						|
 | 
						|
	if len(indexMigrationTaskIDs) == 0 {
 | 
						|
		return taskStats, indexState, nil
 | 
						|
	}
 | 
						|
 | 
						|
	taskQuery = util.MapStr{
 | 
						|
		"size": 500,
 | 
						|
		"query": util.MapStr{
 | 
						|
			"bool": util.MapStr{
 | 
						|
				"must": []util.MapStr{
 | 
						|
					{
 | 
						|
						"terms": util.MapStr{
 | 
						|
							"parent_id": indexMigrationTaskIDs,
 | 
						|
						},
 | 
						|
					},
 | 
						|
					{
 | 
						|
						"term": util.MapStr{
 | 
						|
							"metadata.labels.pipeline_id": util.MapStr{
 | 
						|
								"value": "bulk_indexing",
 | 
						|
							},
 | 
						|
						},
 | 
						|
					},
 | 
						|
				},
 | 
						|
			},
 | 
						|
		},
 | 
						|
	}
 | 
						|
	subTasks, err = migration_util.GetTasks(taskQuery)
 | 
						|
	if err != nil {
 | 
						|
		return taskStats, indexState, err
 | 
						|
	}
 | 
						|
 | 
						|
	var pipelineTaskIDs = map[string][]string{}
 | 
						|
	var pipelineIndexNames = map[string]string{}
 | 
						|
	for _, subTask := range subTasks {
 | 
						|
		taskLabels := util.MapStr(subTask.Metadata.Labels)
 | 
						|
		indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
 | 
						|
		if indexName == "" {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		pipelineIndexNames[subTask.ID] = indexName
 | 
						|
 | 
						|
		if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" {
 | 
						|
			pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs)
 | 
						|
	for pipelineID, pipelineContext := range pipelineContexts {
 | 
						|
		// add indexDocs of running tasks
 | 
						|
		indexDocs := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count")
 | 
						|
		taskStats.IndexDocs += indexDocs
 | 
						|
		indexName := pipelineIndexNames[pipelineID]
 | 
						|
		st := indexState[indexName]
 | 
						|
		st.IndexDocs += indexDocs
 | 
						|
		indexState[indexName] = st
 | 
						|
	}
 | 
						|
	return taskStats, indexState, nil
 | 
						|
}
 |