505 lines
14 KiB
Go
505 lines
14 KiB
Go
package task_manager
|
|
|
|
import (
|
|
"fmt"
|
|
"net/http"
|
|
"strings"
|
|
"time"
|
|
|
|
log "github.com/cihub/seelog"
|
|
|
|
"infini.sh/console/plugin/task_manager/cluster_migration"
|
|
migration_model "infini.sh/console/plugin/task_manager/model"
|
|
migration_util "infini.sh/console/plugin/task_manager/util"
|
|
|
|
"infini.sh/framework/core/api/rbac"
|
|
httprouter "infini.sh/framework/core/api/router"
|
|
"infini.sh/framework/core/global"
|
|
"infini.sh/framework/core/orm"
|
|
"infini.sh/framework/core/task"
|
|
"infini.sh/framework/core/util"
|
|
)
|
|
|
|
func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
|
clusterTaskConfig := &migration_model.ClusterMigrationTaskConfig{}
|
|
err := h.DecodeJSON(req, clusterTaskConfig)
|
|
if err != nil {
|
|
log.Error(err)
|
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
user, err := rbac.FromUserContext(req.Context())
|
|
if err != nil {
|
|
log.Error(err)
|
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
t, err := cluster_migration.CreateTask(clusterTaskConfig, user)
|
|
if err != nil {
|
|
log.Error(err)
|
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
h.WriteCreatedOKJSON(w, t.ID)
|
|
}
|
|
|
|
func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
|
id := ps.MustGetParameter("task_id")
|
|
|
|
obj := task.Task{}
|
|
obj.ID = id
|
|
|
|
exists, err := orm.Get(&obj)
|
|
if !exists || err != nil {
|
|
h.WriteJSON(w, util.MapStr{
|
|
"_id": id,
|
|
"found": false,
|
|
}, http.StatusNotFound)
|
|
return
|
|
}
|
|
taskConfig := &migration_model.ClusterMigrationTaskConfig{}
|
|
err = migration_util.GetTaskConfig(&obj, taskConfig)
|
|
if err != nil {
|
|
log.Error(err)
|
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
_, indexState, err := h.getMigrationMajorTaskInfo(id)
|
|
if err != nil {
|
|
log.Errorf("failed to get major task info, err: %v", err)
|
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
var completedIndices int
|
|
for i, index := range taskConfig.Indices {
|
|
indexName := index.Source.GetUniqueIndexName()
|
|
count := indexState[indexName].IndexDocs
|
|
sourceDocs := index.Source.Docs
|
|
var percent float64
|
|
var exportedPercent float64
|
|
if sourceDocs <= 0 {
|
|
percent = 100
|
|
exportedPercent = 100
|
|
}else{
|
|
percent = float64(count) / float64(sourceDocs) * 100
|
|
if percent > 100 {
|
|
percent = 100
|
|
}
|
|
exportedPercent = float64(indexState[indexName].ScrollDocs)/float64(sourceDocs) * 100
|
|
if exportedPercent > 100 {
|
|
exportedPercent = 100
|
|
}
|
|
}
|
|
//taskConfig.Indices[i].Source.Docs = sourceDocs
|
|
taskConfig.Indices[i].Target.Docs = count
|
|
taskConfig.Indices[i].Percent = util.ToFixed(percent, 2)
|
|
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
|
|
taskConfig.Indices[i].RunningChildren = indexState[indexName].RunningChildren
|
|
taskConfig.Indices[i].ExportedPercent = util.ToFixed(exportedPercent, 2)
|
|
if count == index.Source.Docs {
|
|
completedIndices++
|
|
}
|
|
}
|
|
|
|
cfg := global.MustLookup("cluster_migration_config")
|
|
if migrationConfig, ok := cfg.(*DispatcherConfig); ok {
|
|
if obj.Metadata.Labels == nil {
|
|
obj.Metadata.Labels = util.MapStr{}
|
|
}
|
|
obj.Metadata.Labels["log_info"] = util.MapStr{
|
|
"cluster_id": migrationConfig.Elasticsearch,
|
|
"index_name": migrationConfig.LogIndexName,
|
|
}
|
|
}
|
|
|
|
_, repeatStatus, err := h.calcRepeatingStatus(&obj)
|
|
if err != nil {
|
|
log.Warnf("failed to calc repeat info, err: %v", err)
|
|
}
|
|
obj.Metadata.Labels["repeat"] = repeatStatus
|
|
|
|
obj.ConfigString = util.MustToJSON(taskConfig)
|
|
obj.Metadata.Labels["completed_indices"] = completedIndices
|
|
h.WriteJSON(w, obj, http.StatusOK)
|
|
}
|
|
|
|
func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
|
id := ps.MustGetParameter("task_id")
|
|
uniqueIndexName := ps.MustGetParameter("index")
|
|
majorTask := task.Task{}
|
|
majorTask.ID = id
|
|
exists, err := orm.Get(&majorTask)
|
|
if !exists || err != nil {
|
|
h.WriteError(w, fmt.Sprintf("task [%s] not found", id), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
taskConfig := &migration_model.ClusterMigrationTaskConfig{}
|
|
err = migration_util.GetTaskConfig(&majorTask, taskConfig)
|
|
if err != nil {
|
|
log.Error(err)
|
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
taskInfo := &TaskInfoResponse{
|
|
TaskID: id,
|
|
StartTime: majorTask.StartTimeInMillis,
|
|
Repeating: migration_util.IsRepeating(taskConfig.Settings.Execution.Repeat, majorTask.Metadata.Labels),
|
|
}
|
|
if taskInfo.Repeating {
|
|
_, repeatStatus, err := h.calcRepeatingStatus(&majorTask)
|
|
if err != nil {
|
|
log.Error(err)
|
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
taskInfo.NextRunTime = repeatStatus.NextRunTime
|
|
}
|
|
indexParts := strings.Split(uniqueIndexName, ":")
|
|
for _, index := range taskConfig.Indices {
|
|
if index.Source.Name == indexParts[0] {
|
|
taskInfo.Incremental = index.Incremental
|
|
}
|
|
}
|
|
|
|
subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(id, uniqueIndexName)
|
|
|
|
taskInfo.DataPartition = len(subTasks)
|
|
if len(subTasks) == 0 {
|
|
h.WriteJSON(w, taskInfo, http.StatusOK)
|
|
return
|
|
}
|
|
|
|
var scrollStats = map[string]int64{}
|
|
var bulkStats = map[string]int64{}
|
|
pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs)
|
|
for pipelineID, pipelineContext := range pipelineContexts {
|
|
if pid, ok := pipelineSubParentIDs[pipelineID]; ok {
|
|
if vv := migration_util.GetMapIntValue(pipelineContext, "es_scroll.scrolled_docs"); vv > 0 {
|
|
scrollStats[pid] = vv
|
|
}
|
|
if vv := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count"); vv > 0 {
|
|
bulkStats[pid] = vv
|
|
}
|
|
}
|
|
}
|
|
|
|
startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating)
|
|
|
|
var partitionTaskInfos []util.MapStr
|
|
var workers = map[string]struct{}{}
|
|
|
|
for i, ptask := range subTasks {
|
|
cfg := migration_model.IndexMigrationTaskConfig{}
|
|
err := migration_util.GetTaskConfig(&ptask, &cfg)
|
|
if err != nil {
|
|
log.Errorf("failed to get task config, err: %v", err)
|
|
continue
|
|
}
|
|
if i == 0 {
|
|
taskInfo.Step = cfg.Source.Step
|
|
}
|
|
instID := migration_util.GetMapStringValue(ptask.Metadata.Labels, "execution_instance_id")
|
|
if instID != "" {
|
|
workers[instID] = struct{}{}
|
|
}
|
|
var durationInMS int64
|
|
var subCompletedTime int64
|
|
if ptask.StartTimeInMillis > 0 {
|
|
if migration_util.IsPendingState(ptask.Status) {
|
|
durationInMS = time.Now().UnixMilli() - ptask.StartTimeInMillis
|
|
} else if ptask.CompletedTime != nil {
|
|
subCompletedTime = ptask.CompletedTime.UnixMilli()
|
|
durationInMS = subCompletedTime - ptask.StartTimeInMillis
|
|
}
|
|
}
|
|
var (
|
|
scrollDocs int64
|
|
indexDocs int64
|
|
)
|
|
ptaskLabels := util.MapStr(ptask.Metadata.Labels)
|
|
if vv, ok := scrollStats[ptask.ID]; ok {
|
|
scrollDocs = vv
|
|
} else {
|
|
scrollDocs = migration_util.GetMapIntValue(ptaskLabels, "scrolled_docs")
|
|
}
|
|
if vv, ok := bulkStats[ptask.ID]; ok {
|
|
indexDocs = vv
|
|
} else {
|
|
indexDocs = migration_util.GetMapIntValue(ptaskLabels, "index_docs")
|
|
}
|
|
|
|
partitionTotalDocs := cfg.Source.DocCount
|
|
partitionTaskInfo := util.MapStr{
|
|
"task_id": ptask.ID,
|
|
"status": ptask.Status,
|
|
"start_time": ptask.StartTimeInMillis,
|
|
"completed_time": subCompletedTime,
|
|
"start": cfg.Source.Start,
|
|
"end": cfg.Source.End,
|
|
"duration": durationInMS,
|
|
"scroll_docs": scrollDocs,
|
|
"index_docs": indexDocs,
|
|
"total_docs": partitionTotalDocs,
|
|
}
|
|
scrollTask, bulkTask := migration_util.SplitIndexMigrationTasks(parentIDPipelineTasks[ptask.ID])
|
|
if scrollTask != nil {
|
|
partitionTaskInfo["scroll_task"] = util.MapStr{
|
|
"id": scrollTask.ID,
|
|
"status": scrollTask.Status,
|
|
}
|
|
}
|
|
if bulkTask != nil {
|
|
partitionTaskInfo["bulk_task"] = util.MapStr{
|
|
"id": bulkTask.ID,
|
|
"status": bulkTask.Status,
|
|
}
|
|
}
|
|
partitionTaskInfos = append(partitionTaskInfos, partitionTaskInfo)
|
|
}
|
|
taskInfo.CompletedTime = completedTime
|
|
taskInfo.Duration = duration
|
|
// NOTE: overwrite major task start time with the first started sub task
|
|
if taskInfo.StartTime == 0 {
|
|
taskInfo.StartTime = startTime
|
|
}
|
|
taskInfo.Partitions = partitionTaskInfos
|
|
taskInfo.CompletedPartitions = completedPartitions
|
|
for _, node := range taskConfig.Settings.Execution.Nodes.Permit {
|
|
if _, ok := workers[node.ID]; ok {
|
|
taskInfo.Workers = append(taskInfo.Workers, util.MapStr{
|
|
"id": node.ID,
|
|
"name": node.Name,
|
|
})
|
|
}
|
|
}
|
|
h.WriteJSON(w, taskInfo, http.StatusOK)
|
|
}
|
|
|
|
type MigrationIndexStateInfo struct {
|
|
ErrorPartitions int
|
|
IndexDocs int64
|
|
SourceDocs int64
|
|
RunningChildren int
|
|
ScrollDocs int64
|
|
}
|
|
|
|
/*
|
|
We count data from two sources:
|
|
- index_migrations with complete/error status
|
|
- plus index_migration.index_docs with realtime bulk indexing info
|
|
- realtime bulk indexing info is only available for running index_migrations
|
|
*/
|
|
func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_model.ClusterMigrationTaskState, indexState map[string]MigrationIndexStateInfo, err error) {
|
|
var pipelineTaskIDs = map[string][]string{}
|
|
var pipelineIndexNames = map[string]string{}
|
|
indexState = map[string]MigrationIndexStateInfo{}
|
|
const size = 500
|
|
var (
|
|
from = -size
|
|
hasMore = true
|
|
)
|
|
for hasMore {
|
|
from += size
|
|
taskQuery := util.MapStr{
|
|
"from": from,
|
|
"size": size,
|
|
"sort": []util.MapStr{
|
|
{
|
|
"created": util.MapStr{
|
|
"order": "asc",
|
|
},
|
|
},
|
|
},
|
|
"query": util.MapStr{
|
|
"bool": util.MapStr{
|
|
"must": []util.MapStr{
|
|
{
|
|
"term": util.MapStr{
|
|
"parent_id": util.MapStr{
|
|
"value": id,
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"term": util.MapStr{
|
|
"metadata.type": util.MapStr{
|
|
"value": "index_migration",
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
subTasks, err := migration_util.GetTasks(taskQuery)
|
|
if err != nil {
|
|
return taskStats, indexState, err
|
|
}
|
|
if len(subTasks) < size {
|
|
hasMore = false
|
|
}
|
|
|
|
var indexMigrationTaskIDs []string
|
|
for _, subTask := range subTasks {
|
|
taskLabels := util.MapStr(subTask.Metadata.Labels)
|
|
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
|
|
if indexName == "" {
|
|
continue
|
|
}
|
|
|
|
cfg := migration_model.IndexMigrationTaskConfig{}
|
|
err = migration_util.GetTaskConfig(&subTask, &cfg)
|
|
if err != nil {
|
|
log.Errorf("failed to get task config, err: %v", err)
|
|
continue
|
|
}
|
|
|
|
taskStats.SourceDocs += cfg.Source.DocCount
|
|
st := indexState[indexName]
|
|
st.SourceDocs += cfg.Source.DocCount
|
|
scrollDocs := migration_util.GetMapIntValue(taskLabels, "scrolled_docs")
|
|
st.ScrollDocs += scrollDocs
|
|
|
|
if subTask.Status == task.StatusRunning {
|
|
st.RunningChildren++
|
|
indexState[indexName] = st
|
|
indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID)
|
|
continue
|
|
}
|
|
|
|
indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs")
|
|
taskStats.IndexDocs += indexDocs
|
|
st.IndexDocs += indexDocs
|
|
|
|
if subTask.Status == task.StatusError {
|
|
st.ErrorPartitions += 1
|
|
taskStats.ErrorPartitions += 1
|
|
}
|
|
indexState[indexName] = st
|
|
indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID)
|
|
}
|
|
|
|
if len(indexMigrationTaskIDs) == 0 {
|
|
continue
|
|
}
|
|
|
|
taskQuery = util.MapStr{
|
|
"size": len(indexMigrationTaskIDs) * 2,
|
|
"query": util.MapStr{
|
|
"bool": util.MapStr{
|
|
"must": []util.MapStr{
|
|
{
|
|
"terms": util.MapStr{
|
|
"parent_id": indexMigrationTaskIDs,
|
|
},
|
|
},
|
|
//{
|
|
// "term": util.MapStr{
|
|
// "metadata.labels.pipeline_id": util.MapStr{
|
|
// "value": "bulk_indexing",
|
|
// },
|
|
// },
|
|
//},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
subTasks, err = migration_util.GetTasks(taskQuery)
|
|
if err != nil {
|
|
return taskStats, indexState, err
|
|
}
|
|
|
|
for _, subTask := range subTasks {
|
|
taskLabels := util.MapStr(subTask.Metadata.Labels)
|
|
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
|
|
if indexName == "" {
|
|
continue
|
|
}
|
|
|
|
pipelineIndexNames[subTask.ID] = indexName
|
|
|
|
if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" {
|
|
pipelineTaskIDs[instID] = append(pipelineTaskIDs[instID], subTask.ID)
|
|
}
|
|
}
|
|
}
|
|
|
|
pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs)
|
|
for pipelineID, pipelineContext := range pipelineContexts {
|
|
// add indexDocs of running tasks
|
|
indexDocs := migration_util.GetMapIntValue(pipelineContext, "bulk_indexing.success.count")
|
|
scrollDocs := migration_util.GetMapIntValue(pipelineContext, "es_scroll.scrolled_docs")
|
|
taskStats.IndexDocs += indexDocs
|
|
indexName := pipelineIndexNames[pipelineID]
|
|
st := indexState[indexName]
|
|
st.IndexDocs += indexDocs
|
|
st.ScrollDocs += scrollDocs
|
|
indexState[indexName] = st
|
|
}
|
|
return taskStats, indexState, nil
|
|
}
|
|
|
|
func (h *APIHandler) restartAllFailedPartitions(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
|
mustQ := []util.MapStr{
|
|
{
|
|
"term": util.MapStr{
|
|
"metadata.type": util.MapStr{
|
|
"value": "cluster_migration",
|
|
},
|
|
},
|
|
},
|
|
{
|
|
"term": util.MapStr{
|
|
"status": util.MapStr{
|
|
"value": "error",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
queryDSL := util.MapStr{
|
|
"query": util.MapStr{
|
|
"bool": util.MapStr{
|
|
"must": mustQ,
|
|
},
|
|
},
|
|
"script": util.MapStr{
|
|
"source": fmt.Sprintf("ctx._source['status'] = '%s'", task.StatusRunning),
|
|
},
|
|
}
|
|
|
|
body := util.MustToJSONBytes(queryDSL)
|
|
|
|
err := orm.UpdateBy(&task.Task{}, body)
|
|
if err != nil {
|
|
log.Error(err)
|
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
//update status of sub task
|
|
mustQ[0] = util.MapStr{
|
|
"term": util.MapStr{
|
|
"metadata.type": util.MapStr{
|
|
"value": "index_migration",
|
|
},
|
|
},
|
|
}
|
|
queryDSL["script"] = util.MapStr{
|
|
"source": fmt.Sprintf("ctx._source['status'] = '%s'", task.StatusReady),
|
|
}
|
|
body = util.MustToJSONBytes(queryDSL)
|
|
err = orm.UpdateBy(&task.Task{}, body)
|
|
if err != nil {
|
|
log.Error(err)
|
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
h.WriteAckOKJSON(w)
|
|
} |