[migration] support repeating task & incremental index (#108)
[migration] return error info for comparison/migration task [migration] list api returns running_children [migration] add pause/resume and repeating info mark invalid repeat as done [migration] fix doc count revert unneeded changes cleanup marshals & unmarshals update apis v2, unify repeat & incremental configuration [migration] support repeating task & incremental index Co-authored-by: Kassian Sun <kassiansun@outlook.com> Co-authored-by: silenceqi <silenceqi@infini.ltd> Co-authored-by: liugq <silenceqi@hotmail.com> Co-authored-by: hardy <luohf@infinilabs.com>
This commit is contained in:
parent
02975f33f9
commit
0f0794aca7
|
@ -17,7 +17,6 @@ import (
|
||||||
migration_util "infini.sh/console/plugin/migration/util"
|
migration_util "infini.sh/console/plugin/migration/util"
|
||||||
|
|
||||||
"infini.sh/framework/core/api"
|
"infini.sh/framework/core/api"
|
||||||
"infini.sh/framework/core/api/rbac"
|
|
||||||
"infini.sh/framework/core/api/rbac/enum"
|
"infini.sh/framework/core/api/rbac/enum"
|
||||||
httprouter "infini.sh/framework/core/api/router"
|
httprouter "infini.sh/framework/core/api/router"
|
||||||
"infini.sh/framework/core/elastic"
|
"infini.sh/framework/core/elastic"
|
||||||
|
@ -34,6 +33,8 @@ func InitAPI() {
|
||||||
api.HandleAPIMethod(api.DELETE, "/migration/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionMigrationTaskWrite))
|
api.HandleAPIMethod(api.DELETE, "/migration/data/:task_id", handler.RequirePermission(handler.deleteTask, enum.PermissionMigrationTaskWrite))
|
||||||
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionMigrationTaskWrite))
|
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionMigrationTaskWrite))
|
||||||
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionMigrationTaskWrite))
|
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionMigrationTaskWrite))
|
||||||
|
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_pause", handler.RequirePermission(handler.pauseTask, enum.PermissionMigrationTaskWrite))
|
||||||
|
api.HandleAPIMethod(api.POST, "/migration/data/:task_id/_resume", handler.RequirePermission(handler.resumeTask, enum.PermissionMigrationTaskWrite))
|
||||||
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead))
|
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info", handler.RequirePermission(handler.getDataMigrationTaskInfo, enum.PermissionMigrationTaskRead))
|
||||||
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead))
|
api.HandleAPIMethod(api.GET, "/migration/data/:task_id/info/:index", handler.RequirePermission(handler.getDataMigrationTaskOfIndex, enum.PermissionMigrationTaskRead))
|
||||||
|
|
||||||
|
@ -44,6 +45,8 @@ func InitAPI() {
|
||||||
api.HandleAPIMethod(api.GET, "/comparison/data/:task_id/info/:index", handler.RequirePermission(handler.getDataComparisonTaskOfIndex, enum.PermissionComparisonTaskRead))
|
api.HandleAPIMethod(api.GET, "/comparison/data/:task_id/info/:index", handler.RequirePermission(handler.getDataComparisonTaskOfIndex, enum.PermissionComparisonTaskRead))
|
||||||
api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionComparisonTaskWrite))
|
api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_start", handler.RequirePermission(handler.startTask, enum.PermissionComparisonTaskWrite))
|
||||||
api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionComparisonTaskWrite))
|
api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_stop", handler.RequirePermission(handler.stopTask, enum.PermissionComparisonTaskWrite))
|
||||||
|
api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_pause", handler.RequirePermission(handler.pauseTask, enum.PermissionComparisonTaskWrite))
|
||||||
|
api.HandleAPIMethod(api.POST, "/comparison/data/:task_id/_resume", handler.RequirePermission(handler.resumeTask, enum.PermissionComparisonTaskWrite))
|
||||||
|
|
||||||
api.HandleAPIMethod(api.POST, "/migration/data/_validate", handler.RequireLogin(handler.validateDataMigration))
|
api.HandleAPIMethod(api.POST, "/migration/data/_validate", handler.RequireLogin(handler.validateDataMigration))
|
||||||
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_partition", handler.getIndexPartitionInfo)
|
api.HandleAPIMethod(api.POST, "/elasticsearch/:id/index/:index/_partition", handler.getIndexPartitionInfo)
|
||||||
|
@ -57,63 +60,6 @@ type APIHandler struct {
|
||||||
api.Handler
|
api.Handler
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
|
||||||
clusterTaskConfig := &migration_model.ClusterMigrationTaskConfig{}
|
|
||||||
err := h.DecodeJSON(req, clusterTaskConfig)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if len(clusterTaskConfig.Indices) == 0 {
|
|
||||||
h.WriteError(w, "indices must not be empty", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
user, err := rbac.FromUserContext(req.Context())
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if user != nil {
|
|
||||||
clusterTaskConfig.Creator.Name = user.Username
|
|
||||||
clusterTaskConfig.Creator.Id = user.UserId
|
|
||||||
}
|
|
||||||
|
|
||||||
var totalDocs int64
|
|
||||||
for _, index := range clusterTaskConfig.Indices {
|
|
||||||
totalDocs += index.Source.Docs
|
|
||||||
}
|
|
||||||
|
|
||||||
srcClusterCfg := elastic.GetConfig(clusterTaskConfig.Cluster.Source.Id)
|
|
||||||
clusterTaskConfig.Cluster.Source.Distribution = srcClusterCfg.Distribution
|
|
||||||
dstClusterCfg := elastic.GetConfig(clusterTaskConfig.Cluster.Target.Id)
|
|
||||||
clusterTaskConfig.Cluster.Target.Distribution = dstClusterCfg.Distribution
|
|
||||||
t := task2.Task{
|
|
||||||
Metadata: task2.Metadata{
|
|
||||||
Type: "cluster_migration",
|
|
||||||
Labels: util.MapStr{
|
|
||||||
"business_id": "cluster_migration",
|
|
||||||
"source_cluster_id": clusterTaskConfig.Cluster.Source.Id,
|
|
||||||
"target_cluster_id": clusterTaskConfig.Cluster.Target.Id,
|
|
||||||
"source_total_docs": totalDocs,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Cancellable: true,
|
|
||||||
Runnable: false,
|
|
||||||
Status: task2.StatusInit,
|
|
||||||
ConfigString: util.MustToJSON(clusterTaskConfig),
|
|
||||||
}
|
|
||||||
t.ID = util.GetUUID()
|
|
||||||
err = orm.Create(nil, &t)
|
|
||||||
if err != nil {
|
|
||||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
log.Error(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
h.WriteCreatedOKJSON(w, t.ID)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *APIHandler) getIndexPartitionInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
func (h *APIHandler) getIndexPartitionInfo(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
var (
|
var (
|
||||||
index = ps.MustGetParameter("index")
|
index = ps.MustGetParameter("index")
|
||||||
|
@ -175,6 +121,7 @@ func (h *APIHandler) getDataMigrationTaskInfo(w http.ResponseWriter, req *http.R
|
||||||
if percent > 100 {
|
if percent > 100 {
|
||||||
percent = 100
|
percent = 100
|
||||||
}
|
}
|
||||||
|
taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceDocs
|
||||||
taskConfig.Indices[i].Target.Docs = count
|
taskConfig.Indices[i].Target.Docs = count
|
||||||
taskConfig.Indices[i].Percent = util.ToFixed(percent, 2)
|
taskConfig.Indices[i].Percent = util.ToFixed(percent, 2)
|
||||||
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
|
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
|
||||||
|
@ -206,6 +153,7 @@ type TaskInfoResponse struct {
|
||||||
DataPartition int `json:"data_partition"`
|
DataPartition int `json:"data_partition"`
|
||||||
CompletedPartitions int `json:"completed_partitions"`
|
CompletedPartitions int `json:"completed_partitions"`
|
||||||
Partitions []util.MapStr `json:"partitions"`
|
Partitions []util.MapStr `json:"partitions"`
|
||||||
|
Repeating bool `json:"repeating"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
|
@ -219,12 +167,21 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taskConfig := &migration_model.ClusterMigrationTaskConfig{}
|
||||||
|
err = migration_util.GetTaskConfig(&majorTask, taskConfig)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
taskInfo := &TaskInfoResponse{
|
taskInfo := &TaskInfoResponse{
|
||||||
TaskID: id,
|
TaskID: id,
|
||||||
StartTime: majorTask.StartTimeInMillis,
|
StartTime: majorTask.StartTimeInMillis,
|
||||||
|
Repeating: migration_util.IsRepeating(taskConfig.Settings.Execution.Repeat, majorTask.Metadata.Labels),
|
||||||
}
|
}
|
||||||
|
|
||||||
subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(&majorTask, uniqueIndexName)
|
subTasks, pipelineTaskIDs, pipelineSubParentIDs, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(id, uniqueIndexName)
|
||||||
|
|
||||||
taskInfo.DataPartition = len(subTasks)
|
taskInfo.DataPartition = len(subTasks)
|
||||||
if len(subTasks) == 0 {
|
if len(subTasks) == 0 {
|
||||||
|
@ -246,7 +203,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks)
|
startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating)
|
||||||
|
|
||||||
var partitionTaskInfos []util.MapStr
|
var partitionTaskInfos []util.MapStr
|
||||||
|
|
||||||
|
@ -487,7 +444,7 @@ func (h *APIHandler) refreshIndex(w http.ResponseWriter, req *http.Request, ps h
|
||||||
}, 200)
|
}, 200)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *APIHandler) getChildTaskInfosByIndex(taskItem *task2.Task, uniqueIndexName string) (subTasks []task2.Task, runningPipelineTaskIDs map[string][]string, pipelineSubParentIDs map[string]string, parentIDPipelineTasks map[string][]task2.Task, err error) {
|
func (h *APIHandler) getChildTaskInfosByIndex(id string, uniqueIndexName string) (subTasks []task2.Task, runningPipelineTaskIDs map[string][]string, pipelineSubParentIDs map[string]string, parentIDPipelineTasks map[string][]task2.Task, err error) {
|
||||||
queryDsl := util.MapStr{
|
queryDsl := util.MapStr{
|
||||||
"size": 9999,
|
"size": 9999,
|
||||||
"sort": []util.MapStr{
|
"sort": []util.MapStr{
|
||||||
|
@ -503,7 +460,7 @@ func (h *APIHandler) getChildTaskInfosByIndex(taskItem *task2.Task, uniqueIndexN
|
||||||
{
|
{
|
||||||
"term": util.MapStr{
|
"term": util.MapStr{
|
||||||
"parent_id": util.MapStr{
|
"parent_id": util.MapStr{
|
||||||
"value": taskItem.ID,
|
"value": id,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -518,10 +475,7 @@ func (h *APIHandler) getChildTaskInfosByIndex(taskItem *task2.Task, uniqueIndexN
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
q := &orm.Query{
|
allTasks, err := migration_util.GetTasks(queryDsl)
|
||||||
RawQuery: util.MustToJSONBytes(queryDsl),
|
|
||||||
}
|
|
||||||
err, result := orm.Search(task2.Task{}, q)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -530,25 +484,16 @@ func (h *APIHandler) getChildTaskInfosByIndex(taskItem *task2.Task, uniqueIndexN
|
||||||
pipelineSubParentIDs = map[string]string{}
|
pipelineSubParentIDs = map[string]string{}
|
||||||
parentIDPipelineTasks = map[string][]task2.Task{}
|
parentIDPipelineTasks = map[string][]task2.Task{}
|
||||||
|
|
||||||
for _, row := range result.Result {
|
for _, subTask := range allTasks {
|
||||||
buf := util.MustToJSONBytes(row)
|
|
||||||
subTask := task2.Task{}
|
|
||||||
err = util.FromJSONBytes(buf, &subTask)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if subTask.Metadata.Type != "pipeline" {
|
if subTask.Metadata.Type != "pipeline" {
|
||||||
subTasks = append(subTasks, subTask)
|
subTasks = append(subTasks, subTask)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: use more robust logic
|
|
||||||
if pl := len(subTask.ParentId); pl != 2 {
|
if pl := len(subTask.ParentId); pl != 2 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
parentID := subTask.ParentId[1]
|
parentID := migration_util.GetDirectParentId(subTask.ParentId)
|
||||||
|
|
||||||
pipelineSubParentIDs[subTask.ID] = parentID
|
pipelineSubParentIDs[subTask.ID] = parentID
|
||||||
instID := migration_util.GetMapStringValue(util.MapStr(subTask.Metadata.Labels), "execution_instance_id")
|
instID := migration_util.GetMapStringValue(util.MapStr(subTask.Metadata.Labels), "execution_instance_id")
|
||||||
|
@ -590,7 +535,7 @@ func (h *APIHandler) getChildPipelineInfosFromGateway(pipelineTaskIDs map[string
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *APIHandler) calcMajorTaskInfo(subTasks []task2.Task) (startTime int64, completedTime int64, duration int64, completedPartitions int) {
|
func (h *APIHandler) calcMajorTaskInfo(subTasks []task2.Task, repeating bool) (startTime int64, completedTime int64, duration int64, completedPartitions int) {
|
||||||
if len(subTasks) == 0 {
|
if len(subTasks) == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -615,7 +560,7 @@ func (h *APIHandler) calcMajorTaskInfo(subTasks []task2.Task) (startTime int64,
|
||||||
completedPartitions++
|
completedPartitions++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(subTasks) != completedPartitions {
|
if len(subTasks) != completedPartitions || repeating {
|
||||||
completedTime = 0
|
completedTime = 0
|
||||||
duration = time.Now().UnixMilli() - startTime
|
duration = time.Now().UnixMilli() - startTime
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package cluster_comparison
|
package cluster_comparison
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -76,13 +77,27 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error {
|
||||||
esSourceClient := elastic.GetClient(clusterComparisonTask.Cluster.Source.Id)
|
esSourceClient := elastic.GetClient(clusterComparisonTask.Cluster.Source.Id)
|
||||||
esTargetClient := elastic.GetClient(clusterComparisonTask.Cluster.Target.Id)
|
esTargetClient := elastic.GetClient(clusterComparisonTask.Cluster.Target.Id)
|
||||||
|
|
||||||
err = migration_util.DeleteChildTasks(taskItem.ID)
|
err = migration_util.DeleteChildTasks(taskItem.ID, "index_comparison")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("failed to clear child tasks, err: %v", err)
|
log.Warnf("failed to clear child tasks, err: %v", err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, index := range clusterComparisonTask.Indices {
|
current := migration_util.GetMapIntValue(taskItem.Metadata.Labels, "next_run_time")
|
||||||
|
var step time.Duration
|
||||||
|
if clusterComparisonTask.Settings.Execution.Repeat != nil {
|
||||||
|
step = clusterComparisonTask.Settings.Execution.Repeat.Interval
|
||||||
|
}
|
||||||
|
|
||||||
|
var pids []string
|
||||||
|
pids = append(pids, taskItem.ParentId...)
|
||||||
|
pids = append(pids, taskItem.ID)
|
||||||
|
|
||||||
|
var sourceTotalDocs int64
|
||||||
|
var targetTotalDocs int64
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
for i, index := range clusterComparisonTask.Indices {
|
||||||
sourceDump := migration_model.IndexComparisonDumpConfig{
|
sourceDump := migration_model.IndexComparisonDumpConfig{
|
||||||
ClusterId: clusterComparisonTask.Cluster.Source.Id,
|
ClusterId: clusterComparisonTask.Cluster.Source.Id,
|
||||||
Indices: index.Source.Name,
|
Indices: index.Source.Name,
|
||||||
|
@ -105,6 +120,13 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error {
|
||||||
if index.RawFilter != nil {
|
if index.RawFilter != nil {
|
||||||
must = append(must, index.RawFilter)
|
must = append(must, index.RawFilter)
|
||||||
}
|
}
|
||||||
|
if index.Incremental != nil {
|
||||||
|
incrementalFilter, err := index.Incremental.BuildFilter(current, step)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
must = append(must, incrementalFilter)
|
||||||
|
}
|
||||||
if len(must) > 0 {
|
if len(must) > 0 {
|
||||||
sourceDump.QueryDSL = util.MapStr{
|
sourceDump.QueryDSL = util.MapStr{
|
||||||
"bool": util.MapStr{
|
"bool": util.MapStr{
|
||||||
|
@ -119,9 +141,31 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error {
|
||||||
targetDump.Indices = index.Target.Name
|
targetDump.Indices = index.Target.Name
|
||||||
targetDump.DocCount = index.Target.Docs
|
targetDump.DocCount = index.Target.Docs
|
||||||
|
|
||||||
|
var countQuery = util.MapStr{}
|
||||||
|
if sourceDump.QueryDSL != nil {
|
||||||
|
countQuery = util.MapStr{
|
||||||
|
"query": sourceDump.QueryDSL,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sourceCount, err := esSourceClient.Count(ctx, index.Source.Name, util.MustToJSONBytes(countQuery))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
clusterComparisonTask.Indices[i].Source.Docs = sourceCount.Count
|
||||||
|
sourceTotalDocs += sourceCount.Count
|
||||||
|
sourceDump.DocCount = sourceCount.Count
|
||||||
|
|
||||||
|
targetCount, err := esTargetClient.Count(ctx, index.Target.Name, util.MustToJSONBytes(countQuery))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
clusterComparisonTask.Indices[i].Target.Docs = targetCount.Count
|
||||||
|
targetTotalDocs += targetCount.Count
|
||||||
|
targetDump.DocCount = targetCount.Count
|
||||||
|
|
||||||
if index.Partition == nil {
|
if index.Partition == nil {
|
||||||
indexComparisonTask := task.Task{
|
indexComparisonTask := task.Task{
|
||||||
ParentId: []string{taskItem.ID},
|
ParentId: pids,
|
||||||
Cancellable: true,
|
Cancellable: true,
|
||||||
Runnable: false,
|
Runnable: false,
|
||||||
Status: task.StatusReady,
|
Status: task.StatusReady,
|
||||||
|
@ -182,7 +226,7 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error {
|
||||||
partitions := elastic.MergePartitions(sourcePartitions, targetPartitions, index.Partition.FieldName, index.Partition.FieldType, targetPartitionQ.Filter)
|
partitions := elastic.MergePartitions(sourcePartitions, targetPartitions, index.Partition.FieldName, index.Partition.FieldType, targetPartitionQ.Filter)
|
||||||
|
|
||||||
if len(partitions) == 0 {
|
if len(partitions) == 0 {
|
||||||
return fmt.Errorf("empty partitions")
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -204,7 +248,7 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error {
|
||||||
partitionTargetDump.Indices = index.Target.Name
|
partitionTargetDump.Indices = index.Target.Name
|
||||||
|
|
||||||
partitionComparisonTask := task.Task{
|
partitionComparisonTask := task.Task{
|
||||||
ParentId: []string{taskItem.ID},
|
ParentId: pids,
|
||||||
Cancellable: false,
|
Cancellable: false,
|
||||||
Runnable: true,
|
Runnable: true,
|
||||||
Status: task.StatusReady,
|
Status: task.StatusReady,
|
||||||
|
@ -232,7 +276,10 @@ func (p *processor) splitMajorTask(taskItem *task.Task) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taskItem.ConfigString = util.MustToJSON(clusterComparisonTask)
|
||||||
taskItem.Metadata.Labels["is_split"] = true
|
taskItem.Metadata.Labels["is_split"] = true
|
||||||
|
taskItem.Metadata.Labels["source_total_docs"] = sourceTotalDocs
|
||||||
|
taskItem.Metadata.Labels["target_total_docs"] = targetTotalDocs
|
||||||
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||||
Success: true,
|
Success: true,
|
||||||
}, fmt.Sprintf("major task [%s] splitted", taskItem.ID))
|
}, fmt.Sprintf("major task [%s] splitted", taskItem.ID))
|
||||||
|
@ -336,7 +383,7 @@ func (p *processor) handlePendingStopMajorTask(taskItem *task.Task) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
tasks, err := migration_util.GetPendingChildTasks(p.Elasticsearch, p.IndexName, taskItem.ID, "index_comparison")
|
tasks, err := migration_util.GetPendingChildTasks(taskItem.ID, "index_comparison")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to get sub tasks, err: %v", err)
|
log.Errorf("failed to get sub tasks, err: %v", err)
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -0,0 +1,124 @@
|
||||||
|
package cluster_comparison
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
migration_model "infini.sh/console/plugin/migration/model"
|
||||||
|
migration_util "infini.sh/console/plugin/migration/util"
|
||||||
|
"infini.sh/framework/core/api/rbac"
|
||||||
|
"infini.sh/framework/core/elastic"
|
||||||
|
"infini.sh/framework/core/orm"
|
||||||
|
"infini.sh/framework/core/task"
|
||||||
|
"infini.sh/framework/core/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
func RepeatTask(oldTask *task.Task) (*task.Task, error) {
|
||||||
|
config := migration_model.ClusterComparisonTaskConfig{}
|
||||||
|
err := migration_util.GetTaskConfig(oldTask, &config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
t, err := buildTask(&config, &rbac.ShortUser{
|
||||||
|
Username: config.Creator.Name,
|
||||||
|
UserId: config.Creator.Id,
|
||||||
|
}, true)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
t.ParentId = []string{oldTask.ID}
|
||||||
|
if len(oldTask.ParentId) > 0 {
|
||||||
|
t.ParentId = oldTask.ParentId
|
||||||
|
}
|
||||||
|
|
||||||
|
migration_util.CopyRepeatState(oldTask.Metadata.Labels, t.Metadata.Labels)
|
||||||
|
err = migration_util.UpdateRepeatState(config.Settings.Execution.Repeat, t.Metadata.Labels)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("repeat invalid: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = orm.Create(nil, t)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func CreateTask(config *migration_model.ClusterComparisonTaskConfig, creator *rbac.ShortUser) (*task.Task, error) {
|
||||||
|
t, err := buildTask(config, creator, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = migration_util.UpdateRepeatState(config.Settings.Execution.Repeat, t.Metadata.Labels)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("repeat invalid: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = orm.Create(nil, t)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildTask(config *migration_model.ClusterComparisonTaskConfig, creator *rbac.ShortUser, repeat bool) (*task.Task, error) {
|
||||||
|
if len(config.Indices) == 0 {
|
||||||
|
return nil, errors.New("indices must not be empty")
|
||||||
|
}
|
||||||
|
if creator == nil {
|
||||||
|
return nil, errors.New("missing creator info")
|
||||||
|
}
|
||||||
|
config.Creator.Name = creator.Username
|
||||||
|
config.Creator.Id = creator.UserId
|
||||||
|
|
||||||
|
srcClusterCfg := elastic.GetConfig(config.Cluster.Source.Id)
|
||||||
|
config.Cluster.Source.Distribution = srcClusterCfg.Distribution
|
||||||
|
dstClusterCfg := elastic.GetConfig(config.Cluster.Target.Id)
|
||||||
|
config.Cluster.Target.Distribution = dstClusterCfg.Distribution
|
||||||
|
|
||||||
|
clearTaskConfig(config)
|
||||||
|
|
||||||
|
var sourceTotalDocs int64
|
||||||
|
var targetTotalDocs int64
|
||||||
|
|
||||||
|
for _, index := range config.Indices {
|
||||||
|
if index.Incremental != nil {
|
||||||
|
if repeat {
|
||||||
|
index.Incremental.Full = false
|
||||||
|
} else {
|
||||||
|
index.Incremental.Full = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sourceTotalDocs += index.Source.Docs
|
||||||
|
targetTotalDocs += index.Target.Docs
|
||||||
|
}
|
||||||
|
|
||||||
|
t := task.Task{
|
||||||
|
Metadata: task.Metadata{
|
||||||
|
Type: "cluster_comparison",
|
||||||
|
Labels: util.MapStr{
|
||||||
|
"business_id": "cluster_comparison",
|
||||||
|
"source_cluster_id": config.Cluster.Source.Id,
|
||||||
|
"target_cluster_id": config.Cluster.Target.Id,
|
||||||
|
"source_total_docs": sourceTotalDocs,
|
||||||
|
"target_total_docs": targetTotalDocs,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Cancellable: true,
|
||||||
|
Runnable: false,
|
||||||
|
Status: task.StatusInit,
|
||||||
|
ConfigString: util.MustToJSON(config),
|
||||||
|
}
|
||||||
|
t.ID = util.GetUUID()
|
||||||
|
return &t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// sync with getDataComparisonTaskInfo
|
||||||
|
func clearTaskConfig(config *migration_model.ClusterComparisonTaskConfig) {
|
||||||
|
for i := range config.Indices {
|
||||||
|
config.Indices[i].ScrollPercent = 0
|
||||||
|
config.Indices[i].ErrorPartitions = 0
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
package cluster_migration
|
package cluster_migration
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -78,21 +79,36 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error {
|
||||||
esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id)
|
esSourceClient := elastic.GetClient(clusterMigrationTask.Cluster.Source.Id)
|
||||||
targetType := common.GetClusterDocType(clusterMigrationTask.Cluster.Target.Id)
|
targetType := common.GetClusterDocType(clusterMigrationTask.Cluster.Target.Id)
|
||||||
|
|
||||||
err = migration_util.DeleteChildTasks(taskItem.ID)
|
err = migration_util.DeleteChildTasks(taskItem.ID, "index_migration")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("failed to clear child tasks, err: %v", err)
|
log.Warnf("failed to clear child tasks, err: %v", err)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, index := range clusterMigrationTask.Indices {
|
current := migration_util.GetMapIntValue(taskItem.Metadata.Labels, "next_run_time")
|
||||||
|
var step time.Duration
|
||||||
|
if clusterMigrationTask.Settings.Execution.Repeat != nil {
|
||||||
|
step = clusterMigrationTask.Settings.Execution.Repeat.Interval
|
||||||
|
}
|
||||||
|
|
||||||
|
var pids []string
|
||||||
|
pids = append(pids, taskItem.ParentId...)
|
||||||
|
pids = append(pids, taskItem.ID)
|
||||||
|
|
||||||
|
var totalDocs int64
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
for i, index := range clusterMigrationTask.Indices {
|
||||||
source := migration_model.IndexMigrationSourceConfig{
|
source := migration_model.IndexMigrationSourceConfig{
|
||||||
ClusterId: clusterMigrationTask.Cluster.Source.Id,
|
ClusterId: clusterMigrationTask.Cluster.Source.Id,
|
||||||
Indices: index.Source.Name,
|
Indices: index.Source.Name,
|
||||||
|
DocCount: index.Source.Docs,
|
||||||
SliceSize: clusterMigrationTask.Settings.Scroll.SliceSize,
|
SliceSize: clusterMigrationTask.Settings.Scroll.SliceSize,
|
||||||
BatchSize: clusterMigrationTask.Settings.Scroll.Docs,
|
BatchSize: clusterMigrationTask.Settings.Scroll.Docs,
|
||||||
ScrollTime: clusterMigrationTask.Settings.Scroll.Timeout,
|
ScrollTime: clusterMigrationTask.Settings.Scroll.Timeout,
|
||||||
SkipCountCheck: clusterMigrationTask.Settings.SkipScrollCountCheck,
|
SkipCountCheck: clusterMigrationTask.Settings.SkipScrollCountCheck,
|
||||||
}
|
}
|
||||||
|
|
||||||
if index.IndexRename != nil {
|
if index.IndexRename != nil {
|
||||||
source.IndexRename = index.IndexRename
|
source.IndexRename = index.IndexRename
|
||||||
}
|
}
|
||||||
|
@ -112,6 +128,13 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error {
|
||||||
if index.RawFilter != nil {
|
if index.RawFilter != nil {
|
||||||
must = append(must, index.RawFilter)
|
must = append(must, index.RawFilter)
|
||||||
}
|
}
|
||||||
|
if index.Incremental != nil {
|
||||||
|
incrementalFilter, err := index.Incremental.BuildFilter(current, step)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
must = append(must, incrementalFilter)
|
||||||
|
}
|
||||||
if index.Source.DocType != "" {
|
if index.Source.DocType != "" {
|
||||||
if index.Target.DocType != "" {
|
if index.Target.DocType != "" {
|
||||||
source.TypeRename = util.MapStr{
|
source.TypeRename = util.MapStr{
|
||||||
|
@ -139,6 +162,25 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NOTE: for repeating tasks, frontend can't get the accurate doc count
|
||||||
|
// update here before we split the task, if the index has incremental config and correct delay value,
|
||||||
|
// the doc count will not change and will be stable
|
||||||
|
var countQuery = util.MapStr{}
|
||||||
|
if source.QueryDSL != nil {
|
||||||
|
countQuery = util.MapStr{
|
||||||
|
"query": source.QueryDSL,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
sourceCount, err := esSourceClient.Count(ctx, index.Source.Name, util.MustToJSONBytes(countQuery))
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to count docs, err: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
clusterMigrationTask.Indices[i].Source.Docs = sourceCount.Count
|
||||||
|
source.DocCount = sourceCount.Count
|
||||||
|
totalDocs += sourceCount.Count
|
||||||
|
|
||||||
target := migration_model.IndexMigrationTargetConfig{
|
target := migration_model.IndexMigrationTargetConfig{
|
||||||
ClusterId: clusterMigrationTask.Cluster.Target.Id,
|
ClusterId: clusterMigrationTask.Cluster.Target.Id,
|
||||||
SkipCountCheck: clusterMigrationTask.Settings.SkipBulkCountCheck,
|
SkipCountCheck: clusterMigrationTask.Settings.SkipBulkCountCheck,
|
||||||
|
@ -166,8 +208,8 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if partitions == nil || len(partitions) == 0 {
|
if len(partitions) == 0 {
|
||||||
return fmt.Errorf("empty data with filter: %s", util.MustToJSON(index.RawFilter))
|
continue
|
||||||
}
|
}
|
||||||
var (
|
var (
|
||||||
partitionID int
|
partitionID int
|
||||||
|
@ -188,7 +230,7 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error {
|
||||||
partitionSource.QueryString = ""
|
partitionSource.QueryString = ""
|
||||||
|
|
||||||
partitionMigrationTask := task.Task{
|
partitionMigrationTask := task.Task{
|
||||||
ParentId: []string{taskItem.ID},
|
ParentId: pids,
|
||||||
Cancellable: false,
|
Cancellable: false,
|
||||||
Runnable: true,
|
Runnable: true,
|
||||||
Status: task.StatusReady,
|
Status: task.StatusReady,
|
||||||
|
@ -217,10 +259,8 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error {
|
||||||
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
source.DocCount = index.Source.Docs
|
|
||||||
|
|
||||||
indexMigrationTask := task.Task{
|
indexMigrationTask := task.Task{
|
||||||
ParentId: []string{taskItem.ID},
|
ParentId: pids,
|
||||||
Cancellable: true,
|
Cancellable: true,
|
||||||
Runnable: false,
|
Runnable: false,
|
||||||
Status: task.StatusReady,
|
Status: task.StatusReady,
|
||||||
|
@ -251,7 +291,9 @@ func (p *processor) splitMajorMigrationTask(taskItem *task.Task) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
taskItem.ConfigString = util.MustToJSON(clusterMigrationTask)
|
||||||
taskItem.Metadata.Labels["is_split"] = true
|
taskItem.Metadata.Labels["is_split"] = true
|
||||||
|
taskItem.Metadata.Labels["source_total_docs"] = totalDocs
|
||||||
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||||
Success: true,
|
Success: true,
|
||||||
}, fmt.Sprintf("major task [%s] splitted", taskItem.ID))
|
}, fmt.Sprintf("major task [%s] splitted", taskItem.ID))
|
||||||
|
@ -368,7 +410,7 @@ func (p *processor) handlePendingStopMajorTask(taskItem *task.Task) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
tasks, err := migration_util.GetPendingChildTasks(p.Elasticsearch, p.IndexName, taskItem.ID, "index_migration")
|
tasks, err := migration_util.GetPendingChildTasks(taskItem.ID, "index_migration")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to get sub tasks, err: %v", err)
|
log.Errorf("failed to get sub tasks, err: %v", err)
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -0,0 +1,122 @@
|
||||||
|
package cluster_migration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
migration_model "infini.sh/console/plugin/migration/model"
|
||||||
|
migration_util "infini.sh/console/plugin/migration/util"
|
||||||
|
"infini.sh/framework/core/api/rbac"
|
||||||
|
"infini.sh/framework/core/elastic"
|
||||||
|
"infini.sh/framework/core/orm"
|
||||||
|
"infini.sh/framework/core/task"
|
||||||
|
"infini.sh/framework/core/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
func RepeatTask(oldTask *task.Task) (*task.Task, error) {
|
||||||
|
config := migration_model.ClusterMigrationTaskConfig{}
|
||||||
|
err := migration_util.GetTaskConfig(oldTask, &config)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
t, err := buildTask(&config, &rbac.ShortUser{
|
||||||
|
Username: config.Creator.Name,
|
||||||
|
UserId: config.Creator.Id,
|
||||||
|
}, true)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
t.ParentId = []string{oldTask.ID}
|
||||||
|
if len(oldTask.ParentId) > 0 {
|
||||||
|
t.ParentId = oldTask.ParentId
|
||||||
|
}
|
||||||
|
|
||||||
|
migration_util.CopyRepeatState(oldTask.Metadata.Labels, t.Metadata.Labels)
|
||||||
|
err = migration_util.UpdateRepeatState(config.Settings.Execution.Repeat, t.Metadata.Labels)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("repeat invalid: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = orm.Create(nil, t)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func CreateTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac.ShortUser) (*task.Task, error) {
|
||||||
|
t, err := buildTask(config, creator, false)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = migration_util.UpdateRepeatState(config.Settings.Execution.Repeat, t.Metadata.Labels)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("repeat invalid: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = orm.Create(nil, t)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildTask(config *migration_model.ClusterMigrationTaskConfig, creator *rbac.ShortUser, repeat bool) (*task.Task, error) {
|
||||||
|
if len(config.Indices) == 0 {
|
||||||
|
return nil, errors.New("indices must not be empty")
|
||||||
|
}
|
||||||
|
if creator == nil {
|
||||||
|
return nil, errors.New("missing creator info")
|
||||||
|
}
|
||||||
|
|
||||||
|
config.Creator.Name = creator.Username
|
||||||
|
config.Creator.Id = creator.UserId
|
||||||
|
|
||||||
|
srcClusterCfg := elastic.GetConfig(config.Cluster.Source.Id)
|
||||||
|
config.Cluster.Source.Distribution = srcClusterCfg.Distribution
|
||||||
|
dstClusterCfg := elastic.GetConfig(config.Cluster.Target.Id)
|
||||||
|
config.Cluster.Target.Distribution = dstClusterCfg.Distribution
|
||||||
|
|
||||||
|
clearTaskConfig(config)
|
||||||
|
|
||||||
|
var totalDocs int64
|
||||||
|
for _, index := range config.Indices {
|
||||||
|
if index.Incremental != nil {
|
||||||
|
if repeat {
|
||||||
|
index.Incremental.Full = false
|
||||||
|
} else {
|
||||||
|
index.Incremental.Full = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
totalDocs += index.Source.Docs
|
||||||
|
}
|
||||||
|
|
||||||
|
t := task.Task{
|
||||||
|
Metadata: task.Metadata{
|
||||||
|
Type: "cluster_migration",
|
||||||
|
Labels: util.MapStr{
|
||||||
|
"business_id": "cluster_migration",
|
||||||
|
"source_cluster_id": config.Cluster.Source.Id,
|
||||||
|
"target_cluster_id": config.Cluster.Target.Id,
|
||||||
|
"source_total_docs": totalDocs,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Cancellable: true,
|
||||||
|
Runnable: false,
|
||||||
|
Status: task.StatusInit,
|
||||||
|
ConfigString: util.MustToJSON(config),
|
||||||
|
}
|
||||||
|
t.ID = util.GetUUID()
|
||||||
|
return &t, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// sync with getDataMigrationTaskInfo
|
||||||
|
func clearTaskConfig(config *migration_model.ClusterMigrationTaskConfig) {
|
||||||
|
for i := range config.Indices {
|
||||||
|
config.Indices[i].Target.Docs = 0
|
||||||
|
config.Indices[i].Percent = 0
|
||||||
|
config.Indices[i].ErrorPartitions = 0
|
||||||
|
}
|
||||||
|
}
|
|
@ -5,6 +5,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
log "github.com/cihub/seelog"
|
log "github.com/cihub/seelog"
|
||||||
|
|
||||||
|
@ -64,6 +65,11 @@ func (h *APIHandler) searchTask(taskType string) func(w http.ResponseWriter, req
|
||||||
"query": util.MapStr{
|
"query": util.MapStr{
|
||||||
"bool": util.MapStr{
|
"bool": util.MapStr{
|
||||||
"must": mustQ,
|
"must": mustQ,
|
||||||
|
"must_not": util.MapStr{
|
||||||
|
"exists": util.MapStr{
|
||||||
|
"field": "parent_id",
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -110,6 +116,14 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
sourceM.Put("metadata.labels.target_total_docs", ts.IndexDocs)
|
sourceM.Put("metadata.labels.target_total_docs", ts.IndexDocs)
|
||||||
|
sourceM.Put("metadata.labels.source_total_docs", ts.SourceDocs)
|
||||||
|
sourceM.Put("metadata.labels.error_partitions", ts.ErrorPartitions)
|
||||||
|
count, err := migration_util.CountRunningChildren(taskID, "index_migration")
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("failed to count running children, err: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sourceM.Put("running_children", count)
|
||||||
case "cluster_comparison":
|
case "cluster_comparison":
|
||||||
ts, _, err := h.getComparisonMajorTaskInfo(taskID)
|
ts, _, err := h.getComparisonMajorTaskInfo(taskID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -117,8 +131,23 @@ func (h *APIHandler) populateMajorTaskInfo(taskID string, sourceM util.MapStr) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
sourceM.Put("metadata.labels.source_scroll_docs", ts.SourceScrollDocs)
|
sourceM.Put("metadata.labels.source_scroll_docs", ts.SourceScrollDocs)
|
||||||
|
sourceM.Put("metadata.labels.source_total_docs", ts.SourceTotalDocs)
|
||||||
|
sourceM.Put("metadata.labels.target_total_docs", ts.TargetTotalDocs)
|
||||||
sourceM.Put("metadata.labels.target_scroll_docs", ts.TargetScrollDocs)
|
sourceM.Put("metadata.labels.target_scroll_docs", ts.TargetScrollDocs)
|
||||||
|
sourceM.Put("metadata.labels.total_diff_docs", ts.TotalDiffDocs)
|
||||||
|
count, err := migration_util.CountRunningChildren(taskID, "index_comparison")
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("failed to count running children, err: %v", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
sourceM.Put("running_children", count)
|
||||||
|
}
|
||||||
|
_, repeatStatus, err := h.calcRepeatingStatus(&majorTask)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("failed to calc repeat info, err: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sourceM.Put("repeat", repeatStatus)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
|
@ -136,6 +165,13 @@ func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps http
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
obj.Status = task.StatusReady
|
obj.Status = task.StatusReady
|
||||||
|
if _, ok := obj.Metadata.Labels["next_run_time"]; !ok {
|
||||||
|
startTime := time.Now().UnixMilli()
|
||||||
|
// only set for initial cluster-level tasks
|
||||||
|
if len(obj.ParentId) == 0 {
|
||||||
|
obj.Metadata.Labels["next_run_time"] = startTime
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
err = orm.Update(nil, &obj)
|
err = orm.Update(nil, &obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -172,41 +208,6 @@ func (h *APIHandler) startTask(w http.ResponseWriter, req *http.Request, ps http
|
||||||
}, 200)
|
}, 200)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *APIHandler) stopTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
|
||||||
id := ps.MustGetParameter("task_id")
|
|
||||||
obj := task.Task{}
|
|
||||||
obj.ID = id
|
|
||||||
|
|
||||||
exists, err := orm.Get(&obj)
|
|
||||||
if !exists || err != nil {
|
|
||||||
h.WriteJSON(w, util.MapStr{
|
|
||||||
"_id": id,
|
|
||||||
"found": false,
|
|
||||||
}, http.StatusNotFound)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if task.IsEnded(obj.Status) {
|
|
||||||
h.WriteJSON(w, util.MapStr{
|
|
||||||
"success": true,
|
|
||||||
}, 200)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
obj.Status = task.StatusPendingStop
|
|
||||||
err = orm.Update(nil, &obj)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
migration_util.WriteLog(&obj, &task.TaskResult{
|
|
||||||
Success: true,
|
|
||||||
}, "task status manually set to pending stop")
|
|
||||||
|
|
||||||
h.WriteJSON(w, util.MapStr{
|
|
||||||
"success": true,
|
|
||||||
}, 200)
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete task and all sub tasks
|
// delete task and all sub tasks
|
||||||
func (h *APIHandler) deleteTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
func (h *APIHandler) deleteTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
id := ps.MustGetParameter("task_id")
|
id := ps.MustGetParameter("task_id")
|
||||||
|
@ -266,3 +267,172 @@ func (h *APIHandler) deleteTask(w http.ResponseWriter, req *http.Request, ps htt
|
||||||
"result": "deleted",
|
"result": "deleted",
|
||||||
}, 200)
|
}, 200)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// resume an repeating task
|
||||||
|
func (h *APIHandler) resumeTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
|
taskID := ps.MustGetParameter("task_id")
|
||||||
|
obj := task.Task{}
|
||||||
|
|
||||||
|
obj.ID = taskID
|
||||||
|
exists, err := orm.Get(&obj)
|
||||||
|
if !exists || err != nil {
|
||||||
|
h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(obj.ParentId) > 0 {
|
||||||
|
h.WriteError(w, fmt.Sprintf("can't resume on a child task", taskID), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
lastRepeatingChild, repeatStatus, err := h.calcRepeatingStatus(&obj)
|
||||||
|
if err != nil {
|
||||||
|
h.WriteError(w, fmt.Sprintf("failed to get repeating status", taskID), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !repeatStatus.IsRepeat {
|
||||||
|
h.WriteError(w, fmt.Sprintf("not a repeating task", taskID), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if repeatStatus.Done {
|
||||||
|
h.WriteError(w, fmt.Sprintf("repeat task done", taskID), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if repeatStatus.Repeating {
|
||||||
|
h.WriteJSON(w, util.MapStr{
|
||||||
|
"success": true,
|
||||||
|
}, 200)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
lastRepeatingChild.Metadata.Labels["repeat_triggered"] = false
|
||||||
|
err = orm.Update(nil, lastRepeatingChild)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to update last child, err: %v", err)
|
||||||
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
migration_util.WriteLog(&obj, nil, fmt.Sprintf("repeating task [%s] manually resumed", obj.Metadata.Type, taskID))
|
||||||
|
h.WriteJSON(w, util.MapStr{
|
||||||
|
"success": true,
|
||||||
|
}, 200)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
type RepeatStatus struct {
|
||||||
|
IsRepeat bool `json:"is_repeat"`
|
||||||
|
Done bool `json:"done"`
|
||||||
|
Repeating bool `json:"repeating"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *APIHandler) calcRepeatingStatus(taskItem *task.Task) (*task.Task, *RepeatStatus, error) {
|
||||||
|
ret := &RepeatStatus{}
|
||||||
|
lastRepeatingChild, err := migration_util.GetLastRepeatingChildTask(taskItem.ID, taskItem.Metadata.Type)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
if lastRepeatingChild == nil {
|
||||||
|
lastRepeatingChild = taskItem
|
||||||
|
}
|
||||||
|
|
||||||
|
isRepeat := migration_util.GetMapBoolValue(lastRepeatingChild.Metadata.Labels, "is_repeat")
|
||||||
|
if !isRepeat {
|
||||||
|
return lastRepeatingChild, ret, nil
|
||||||
|
}
|
||||||
|
ret.IsRepeat = isRepeat
|
||||||
|
|
||||||
|
repeatDone := migration_util.GetMapBoolValue(lastRepeatingChild.Metadata.Labels, "repeat_done")
|
||||||
|
if repeatDone {
|
||||||
|
ret.Done = true
|
||||||
|
return lastRepeatingChild, ret, nil
|
||||||
|
}
|
||||||
|
repeatTriggered := migration_util.GetMapBoolValue(lastRepeatingChild.Metadata.Labels, "repeat_triggered")
|
||||||
|
if !repeatTriggered {
|
||||||
|
ret.Repeating = true
|
||||||
|
}
|
||||||
|
return lastRepeatingChild, ret, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *APIHandler) stopTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
|
id := ps.MustGetParameter("task_id")
|
||||||
|
obj := task.Task{}
|
||||||
|
obj.ID = id
|
||||||
|
|
||||||
|
exists, err := orm.Get(&obj)
|
||||||
|
if !exists || err != nil {
|
||||||
|
h.WriteJSON(w, util.MapStr{
|
||||||
|
"_id": id,
|
||||||
|
"found": false,
|
||||||
|
}, http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if task.IsEnded(obj.Status) {
|
||||||
|
h.WriteJSON(w, util.MapStr{
|
||||||
|
"success": true,
|
||||||
|
}, 200)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
obj.Status = task.StatusPendingStop
|
||||||
|
err = orm.Update(nil, &obj)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
migration_util.WriteLog(&obj, &task.TaskResult{
|
||||||
|
Success: true,
|
||||||
|
}, "task status manually set to pending stop")
|
||||||
|
|
||||||
|
h.WriteJSON(w, util.MapStr{
|
||||||
|
"success": true,
|
||||||
|
}, 200)
|
||||||
|
}
|
||||||
|
|
||||||
|
// pause an repeating task
|
||||||
|
func (h *APIHandler) pauseTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
|
taskID := ps.MustGetParameter("task_id")
|
||||||
|
obj := task.Task{}
|
||||||
|
|
||||||
|
obj.ID = taskID
|
||||||
|
exists, err := orm.Get(&obj)
|
||||||
|
if !exists || err != nil {
|
||||||
|
h.WriteError(w, fmt.Sprintf("task [%s] not found", taskID), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(obj.ParentId) > 0 {
|
||||||
|
h.WriteError(w, fmt.Sprintf("can't pause on a child task", taskID), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
lastRepeatingChild, repeatStatus, err := h.calcRepeatingStatus(&obj)
|
||||||
|
if err != nil {
|
||||||
|
h.WriteError(w, fmt.Sprintf("failed to get repeating status", taskID), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !repeatStatus.IsRepeat {
|
||||||
|
h.WriteError(w, fmt.Sprintf("not a repeating task", taskID), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if repeatStatus.Done {
|
||||||
|
h.WriteError(w, fmt.Sprintf("repeat task done", taskID), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !repeatStatus.Repeating {
|
||||||
|
h.WriteJSON(w, util.MapStr{
|
||||||
|
"success": true,
|
||||||
|
}, 200)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
lastRepeatingChild.Metadata.Labels["repeat_triggered"] = true
|
||||||
|
err = orm.Update(nil, lastRepeatingChild)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to update last child, err: %v", err)
|
||||||
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
migration_util.WriteLog(&obj, nil, fmt.Sprintf("repeating task [%s] manually paused", taskID))
|
||||||
|
h.WriteJSON(w, util.MapStr{
|
||||||
|
"success": true,
|
||||||
|
}, 200)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
|
@ -7,12 +7,12 @@ import (
|
||||||
|
|
||||||
log "github.com/cihub/seelog"
|
log "github.com/cihub/seelog"
|
||||||
|
|
||||||
|
"infini.sh/console/plugin/migration/cluster_comparison"
|
||||||
migration_model "infini.sh/console/plugin/migration/model"
|
migration_model "infini.sh/console/plugin/migration/model"
|
||||||
migration_util "infini.sh/console/plugin/migration/util"
|
migration_util "infini.sh/console/plugin/migration/util"
|
||||||
|
|
||||||
"infini.sh/framework/core/api/rbac"
|
"infini.sh/framework/core/api/rbac"
|
||||||
httprouter "infini.sh/framework/core/api/router"
|
httprouter "infini.sh/framework/core/api/router"
|
||||||
"infini.sh/framework/core/elastic"
|
|
||||||
"infini.sh/framework/core/global"
|
"infini.sh/framework/core/global"
|
||||||
"infini.sh/framework/core/orm"
|
"infini.sh/framework/core/orm"
|
||||||
"infini.sh/framework/core/task"
|
"infini.sh/framework/core/task"
|
||||||
|
@ -27,50 +27,13 @@ func (h *APIHandler) createDataComparisonTask(w http.ResponseWriter, req *http.R
|
||||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if len(clusterTaskConfig.Indices) == 0 {
|
|
||||||
h.WriteError(w, "indices must not be empty", http.StatusInternalServerError)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
user, err := rbac.FromUserContext(req.Context())
|
user, err := rbac.FromUserContext(req.Context())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if user != nil {
|
t, err := cluster_comparison.CreateTask(clusterTaskConfig, user)
|
||||||
clusterTaskConfig.Creator.Name = user.Username
|
|
||||||
clusterTaskConfig.Creator.Id = user.UserId
|
|
||||||
}
|
|
||||||
|
|
||||||
var sourceTotalDocs int64
|
|
||||||
var targetTotalDocs int64
|
|
||||||
for _, index := range clusterTaskConfig.Indices {
|
|
||||||
sourceTotalDocs += index.Source.Docs
|
|
||||||
targetTotalDocs += index.Target.Docs
|
|
||||||
}
|
|
||||||
|
|
||||||
srcClusterCfg := elastic.GetConfig(clusterTaskConfig.Cluster.Source.Id)
|
|
||||||
clusterTaskConfig.Cluster.Source.Distribution = srcClusterCfg.Distribution
|
|
||||||
dstClusterCfg := elastic.GetConfig(clusterTaskConfig.Cluster.Target.Id)
|
|
||||||
clusterTaskConfig.Cluster.Target.Distribution = dstClusterCfg.Distribution
|
|
||||||
t := task.Task{
|
|
||||||
Metadata: task.Metadata{
|
|
||||||
Type: "cluster_comparison",
|
|
||||||
Labels: util.MapStr{
|
|
||||||
"business_id": "cluster_comparison",
|
|
||||||
"source_cluster_id": clusterTaskConfig.Cluster.Source.Id,
|
|
||||||
"target_cluster_id": clusterTaskConfig.Cluster.Target.Id,
|
|
||||||
"source_total_docs": sourceTotalDocs,
|
|
||||||
"target_total_docs": targetTotalDocs,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Cancellable: true,
|
|
||||||
Runnable: false,
|
|
||||||
Status: task.StatusInit,
|
|
||||||
ConfigString: util.MustToJSON(clusterTaskConfig),
|
|
||||||
}
|
|
||||||
t.ID = util.GetUUID()
|
|
||||||
err = orm.Create(nil, &t)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
|
@ -116,6 +79,8 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http.
|
||||||
if percent > 100 {
|
if percent > 100 {
|
||||||
percent = 100
|
percent = 100
|
||||||
}
|
}
|
||||||
|
taskConfig.Indices[i].Source.Docs = indexState[indexName].SourceTotalDocs
|
||||||
|
taskConfig.Indices[i].Target.Docs = indexState[indexName].TargetTotalDocs
|
||||||
taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2)
|
taskConfig.Indices[i].ScrollPercent = util.ToFixed(percent, 2)
|
||||||
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
|
taskConfig.Indices[i].ErrorPartitions = indexState[indexName].ErrorPartitions
|
||||||
if count == index.Source.Docs+index.Target.Docs {
|
if count == index.Source.Docs+index.Target.Docs {
|
||||||
|
@ -137,14 +102,27 @@ func (h *APIHandler) getDataComparisonTaskInfo(w http.ResponseWriter, req *http.
|
||||||
h.WriteJSON(w, obj, http.StatusOK)
|
h.WriteJSON(w, obj, http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ClusterComparisonTaskState struct {
|
||||||
|
SourceTotalDocs int64
|
||||||
|
SourceScrollDocs int64
|
||||||
|
TargetTotalDocs int64
|
||||||
|
TargetScrollDocs int64
|
||||||
|
TotalDiffDocs int64
|
||||||
|
}
|
||||||
|
|
||||||
type ComparisonIndexStateInfo struct {
|
type ComparisonIndexStateInfo struct {
|
||||||
ErrorPartitions int
|
ErrorPartitions int
|
||||||
|
SourceTotalDocs int64
|
||||||
SourceScrollDocs int64
|
SourceScrollDocs int64
|
||||||
|
TargetTotalDocs int64
|
||||||
TargetScrollDocs int64
|
TargetScrollDocs int64
|
||||||
|
TotalDiffDocs int64
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: calc realtime info from instance
|
// TODO: calc realtime info from instance
|
||||||
func (h *APIHandler) getComparisonMajorTaskInfo(majorTaskID string) (taskStats migration_model.ClusterComparisonTaskState, indexState map[string]ComparisonIndexStateInfo, err error) {
|
func (h *APIHandler) getComparisonMajorTaskInfo(taskID string) (taskStats ClusterComparisonTaskState, indexState map[string]ComparisonIndexStateInfo, err error) {
|
||||||
|
indexState = map[string]ComparisonIndexStateInfo{}
|
||||||
|
|
||||||
taskQuery := util.MapStr{
|
taskQuery := util.MapStr{
|
||||||
"size": 500,
|
"size": 500,
|
||||||
"query": util.MapStr{
|
"query": util.MapStr{
|
||||||
|
@ -153,24 +131,10 @@ func (h *APIHandler) getComparisonMajorTaskInfo(majorTaskID string) (taskStats m
|
||||||
{
|
{
|
||||||
"term": util.MapStr{
|
"term": util.MapStr{
|
||||||
"parent_id": util.MapStr{
|
"parent_id": util.MapStr{
|
||||||
"value": majorTaskID,
|
"value": taskID,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"bool": util.MapStr{
|
|
||||||
"minimum_should_match": 1,
|
|
||||||
"should": []util.MapStr{
|
|
||||||
{
|
|
||||||
"term": util.MapStr{
|
|
||||||
"metadata.labels.pipeline_id": util.MapStr{
|
|
||||||
"value": "dump_hash",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"bool": util.MapStr{
|
|
||||||
"must": []util.MapStr{
|
|
||||||
{
|
{
|
||||||
"term": util.MapStr{
|
"term": util.MapStr{
|
||||||
"metadata.type": util.MapStr{
|
"metadata.type": util.MapStr{
|
||||||
|
@ -186,56 +150,43 @@ func (h *APIHandler) getComparisonMajorTaskInfo(majorTaskID string) (taskStats m
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
q := &orm.Query{
|
subTasks, err := migration_util.GetTasks(taskQuery)
|
||||||
RawQuery: util.MustToJSONBytes(taskQuery),
|
|
||||||
}
|
|
||||||
err, result := orm.Search(task.Task{}, q)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return taskStats, indexState, err
|
return taskStats, indexState, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var pipelineIndexNames = map[string]string{}
|
for _, subTask := range subTasks {
|
||||||
indexState = map[string]ComparisonIndexStateInfo{}
|
|
||||||
for _, row := range result.Result {
|
|
||||||
buf := util.MustToJSONBytes(row)
|
|
||||||
subTask := task.Task{}
|
|
||||||
err := util.FromJSONBytes(buf, &subTask)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed to unmarshal task, err: %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if subTask.Metadata.Labels == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
taskLabels := util.MapStr(subTask.Metadata.Labels)
|
taskLabels := util.MapStr(subTask.Metadata.Labels)
|
||||||
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
|
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
|
||||||
if indexName == "" {
|
if indexName == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// add indexDocs of already complete/error
|
cfg := migration_model.IndexComparisonTaskConfig{}
|
||||||
if subTask.Metadata.Type == "index_comparison" {
|
err = migration_util.GetTaskConfig(&subTask, &cfg)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to get task config, err: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled")
|
sourceDocs := migration_util.GetMapIntValue(taskLabels, "source_scrolled")
|
||||||
targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled")
|
targetDocs := migration_util.GetMapIntValue(taskLabels, "target_scrolled")
|
||||||
|
totalDiffDocs := migration_util.GetMapIntValue(taskLabels, "total_diff_docs")
|
||||||
|
taskStats.SourceTotalDocs += cfg.Source.DocCount
|
||||||
taskStats.SourceScrollDocs += sourceDocs
|
taskStats.SourceScrollDocs += sourceDocs
|
||||||
|
taskStats.TargetTotalDocs += cfg.Target.DocCount
|
||||||
taskStats.TargetScrollDocs += targetDocs
|
taskStats.TargetScrollDocs += targetDocs
|
||||||
|
taskStats.TotalDiffDocs += totalDiffDocs
|
||||||
st := indexState[indexName]
|
st := indexState[indexName]
|
||||||
|
st.SourceTotalDocs += cfg.Source.DocCount
|
||||||
st.SourceScrollDocs += sourceDocs
|
st.SourceScrollDocs += sourceDocs
|
||||||
|
st.TargetTotalDocs += cfg.Target.DocCount
|
||||||
st.TargetScrollDocs += targetDocs
|
st.TargetScrollDocs += targetDocs
|
||||||
|
st.TotalDiffDocs += totalDiffDocs
|
||||||
if subTask.Status == task.StatusError {
|
if subTask.Status == task.StatusError {
|
||||||
st.ErrorPartitions += 1
|
st.ErrorPartitions += 1
|
||||||
}
|
}
|
||||||
indexState[indexName] = st
|
indexState[indexName] = st
|
||||||
continue
|
|
||||||
}
|
|
||||||
pipelineIndexNames[subTask.ID] = indexName
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return taskStats, indexState, nil
|
return taskStats, indexState, nil
|
||||||
|
@ -252,12 +203,21 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taskConfig := &migration_model.ClusterComparisonTaskConfig{}
|
||||||
|
err = migration_util.GetTaskConfig(&majorTask, taskConfig)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
taskInfo := &TaskInfoResponse{
|
taskInfo := &TaskInfoResponse{
|
||||||
TaskID: id,
|
TaskID: id,
|
||||||
StartTime: majorTask.StartTimeInMillis,
|
StartTime: majorTask.StartTimeInMillis,
|
||||||
|
Repeating: migration_util.IsRepeating(taskConfig.Settings.Execution.Repeat, majorTask.Metadata.Labels),
|
||||||
}
|
}
|
||||||
|
|
||||||
subTasks, pipelineTaskIDs, _, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(&majorTask, uniqueIndexName)
|
subTasks, pipelineTaskIDs, _, parentIDPipelineTasks, err := h.getChildTaskInfosByIndex(id, uniqueIndexName)
|
||||||
|
|
||||||
taskInfo.DataPartition = len(subTasks)
|
taskInfo.DataPartition = len(subTasks)
|
||||||
if len(subTasks) == 0 {
|
if len(subTasks) == 0 {
|
||||||
|
@ -266,7 +226,7 @@ func (h *APIHandler) getDataComparisonTaskOfIndex(w http.ResponseWriter, req *ht
|
||||||
}
|
}
|
||||||
|
|
||||||
pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs)
|
pipelineContexts := h.getChildPipelineInfosFromGateway(pipelineTaskIDs)
|
||||||
startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks)
|
startTime, completedTime, duration, completedPartitions := h.calcMajorTaskInfo(subTasks, taskInfo.Repeating)
|
||||||
|
|
||||||
var partitionTaskInfos []util.MapStr
|
var partitionTaskInfos []util.MapStr
|
||||||
|
|
||||||
|
|
|
@ -63,7 +63,7 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error {
|
||||||
return fmt.Errorf("got wrong parent id of task [%v]", *taskItem)
|
return fmt.Errorf("got wrong parent id of task [%v]", *taskItem)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = migration_util.DeleteChildTasks(taskItem.ID)
|
err = migration_util.DeleteChildTasks(taskItem.ID, "pipeline")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("failed to clear child tasks, err: %v", err)
|
log.Warnf("failed to clear child tasks, err: %v", err)
|
||||||
return nil
|
return nil
|
||||||
|
@ -344,6 +344,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
|
||||||
taskItem.Metadata.Labels["target_scrolled"] = targetDocs
|
taskItem.Metadata.Labels["target_scrolled"] = targetDocs
|
||||||
if sourceDocs != targetDocs {
|
if sourceDocs != targetDocs {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
taskItem.Metadata.Labels["total_diff_docs"] = sourceDocs
|
||||||
taskItem.CompletedTime = &now
|
taskItem.CompletedTime = &now
|
||||||
taskItem.Status = task.StatusError
|
taskItem.Status = task.StatusError
|
||||||
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||||
|
@ -372,6 +373,7 @@ func (p *processor) handleRunningSubTask(taskItem *task.Task) error {
|
||||||
)
|
)
|
||||||
if onlyInSource > 0 || onlyInTarget > 0 || diffBoth > 0 {
|
if onlyInSource > 0 || onlyInTarget > 0 || diffBoth > 0 {
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
taskItem.Metadata.Labels["total_diff_docs"] = onlyInSource + onlyInTarget + diffBoth
|
||||||
taskItem.CompletedTime = &now
|
taskItem.CompletedTime = &now
|
||||||
taskItem.Status = task.StatusError
|
taskItem.Status = task.StatusError
|
||||||
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||||
|
@ -416,7 +418,7 @@ func (p *processor) handlePendingStopSubTask(taskItem *task.Task) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
tasks, err := migration_util.GetPendingChildTasks(p.Elasticsearch, p.IndexName, taskItem.ID, "pipeline")
|
tasks, err := migration_util.GetPendingChildTasks(taskItem.ID, "pipeline")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to get sub tasks, err: %v", err)
|
log.Errorf("failed to get sub tasks, err: %v", err)
|
||||||
return nil
|
return nil
|
||||||
|
@ -431,7 +433,7 @@ func (p *processor) handlePendingStopSubTask(taskItem *task.Task) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor) getPipelineTasks(taskItem *task.Task, cfg *migration_model.IndexComparisonTaskConfig) (sourceDumpTask *task.Task, targetDumpTask *task.Task, diffTask *task.Task, err error) {
|
func (p *processor) getPipelineTasks(taskItem *task.Task, cfg *migration_model.IndexComparisonTaskConfig) (sourceDumpTask *task.Task, targetDumpTask *task.Task, diffTask *task.Task, err error) {
|
||||||
ptasks, err := migration_util.GetChildTasks(p.Elasticsearch, p.IndexName, taskItem.ID, "pipeline", nil)
|
ptasks, err := migration_util.GetChildTasks(taskItem.ID, "pipeline", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to get pipeline tasks, err: %v", err)
|
log.Errorf("failed to get pipeline tasks, err: %v", err)
|
||||||
return
|
return
|
||||||
|
|
|
@ -53,7 +53,7 @@ func (p *processor) handlePendingStopSubTask(taskItem *task.Task) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
tasks, err := migration_util.GetPendingChildTasks(p.Elasticsearch, p.IndexName, taskItem.ID, "pipeline")
|
tasks, err := migration_util.GetPendingChildTasks(taskItem.ID, "pipeline")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to get sub tasks, err: %v", err)
|
log.Errorf("failed to get sub tasks, err: %v", err)
|
||||||
return nil
|
return nil
|
||||||
|
@ -93,7 +93,7 @@ func (p *processor) handleSplitSubTask(taskItem *task.Task) error {
|
||||||
return fmt.Errorf("got wrong parent id of task [%v]", *taskItem)
|
return fmt.Errorf("got wrong parent id of task [%v]", *taskItem)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = migration_util.DeleteChildTasks(taskItem.ID)
|
err = migration_util.DeleteChildTasks(taskItem.ID, "pipeline")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("failed to clear child tasks, err: %v", err)
|
log.Warnf("failed to clear child tasks, err: %v", err)
|
||||||
return nil
|
return nil
|
||||||
|
@ -474,7 +474,7 @@ func (p *processor) checkBulkPipelineTaskStatus(bulkTask *task.Task, cfg *migrat
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor) getExecutionConfigFromMajorTask(taskItem *task.Task) (config migration_model.ExecutionConfig, err error) {
|
func (p *processor) getExecutionConfigFromMajorTask(taskItem *task.Task) (config migration_model.ExecutionConfig, err error) {
|
||||||
majorTaskID := taskItem.ParentId[0]
|
majorTaskID := migration_util.GetDirectParentId(taskItem.ParentId)
|
||||||
majorTask := task.Task{}
|
majorTask := task.Task{}
|
||||||
majorTask.ID = majorTaskID
|
majorTask.ID = majorTaskID
|
||||||
_, err = orm.Get(&majorTask)
|
_, err = orm.Get(&majorTask)
|
||||||
|
@ -493,7 +493,7 @@ func (p *processor) getExecutionConfigFromMajorTask(taskItem *task.Task) (config
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *processor) getScrollBulkPipelineTasks(taskItem *task.Task) (scrollTask *task.Task, bulkTask *task.Task, err error) {
|
func (p *processor) getScrollBulkPipelineTasks(taskItem *task.Task) (scrollTask *task.Task, bulkTask *task.Task, err error) {
|
||||||
ptasks, err := migration_util.GetChildTasks(p.Elasticsearch, p.IndexName, taskItem.ID, "pipeline", nil)
|
ptasks, err := migration_util.GetChildTasks(taskItem.ID, "pipeline", nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to get pipeline tasks, err: %v", err)
|
log.Errorf("failed to get pipeline tasks, err: %v", err)
|
||||||
return
|
return
|
||||||
|
|
|
@ -1,19 +1,48 @@
|
||||||
package migration
|
package migration
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
log "github.com/cihub/seelog"
|
log "github.com/cihub/seelog"
|
||||||
|
|
||||||
|
"infini.sh/console/plugin/migration/cluster_migration"
|
||||||
migration_model "infini.sh/console/plugin/migration/model"
|
migration_model "infini.sh/console/plugin/migration/model"
|
||||||
migration_util "infini.sh/console/plugin/migration/util"
|
migration_util "infini.sh/console/plugin/migration/util"
|
||||||
|
|
||||||
"infini.sh/framework/core/orm"
|
"infini.sh/framework/core/api/rbac"
|
||||||
|
httprouter "infini.sh/framework/core/api/router"
|
||||||
"infini.sh/framework/core/task"
|
"infini.sh/framework/core/task"
|
||||||
"infini.sh/framework/core/util"
|
"infini.sh/framework/core/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (h *APIHandler) createDataMigrationTask(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
|
||||||
|
clusterTaskConfig := &migration_model.ClusterMigrationTaskConfig{}
|
||||||
|
err := h.DecodeJSON(req, clusterTaskConfig)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
user, err := rbac.FromUserContext(req.Context())
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t, err := cluster_migration.CreateTask(clusterTaskConfig, user)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
h.WriteError(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
h.WriteCreatedOKJSON(w, t.ID)
|
||||||
|
}
|
||||||
|
|
||||||
type MigrationIndexStateInfo struct {
|
type MigrationIndexStateInfo struct {
|
||||||
ErrorPartitions int
|
ErrorPartitions int
|
||||||
IndexDocs int64
|
IndexDocs int64
|
||||||
|
SourceDocs int64
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -22,7 +51,7 @@ We count data from two sources:
|
||||||
- plus index_migration.index_docs with realtime bulk indexing info
|
- plus index_migration.index_docs with realtime bulk indexing info
|
||||||
- realtime bulk indexing info is only available for running index_migrations
|
- realtime bulk indexing info is only available for running index_migrations
|
||||||
*/
|
*/
|
||||||
func (h *APIHandler) getMigrationMajorTaskInfo(majorTaskID string) (taskStats migration_model.ClusterMigrationTaskState, indexState map[string]MigrationIndexStateInfo, err error) {
|
func (h *APIHandler) getMigrationMajorTaskInfo(id string) (taskStats migration_model.ClusterMigrationTaskState, indexState map[string]MigrationIndexStateInfo, err error) {
|
||||||
taskQuery := util.MapStr{
|
taskQuery := util.MapStr{
|
||||||
"size": 500,
|
"size": 500,
|
||||||
"query": util.MapStr{
|
"query": util.MapStr{
|
||||||
|
@ -31,24 +60,10 @@ func (h *APIHandler) getMigrationMajorTaskInfo(majorTaskID string) (taskStats mi
|
||||||
{
|
{
|
||||||
"term": util.MapStr{
|
"term": util.MapStr{
|
||||||
"parent_id": util.MapStr{
|
"parent_id": util.MapStr{
|
||||||
"value": majorTaskID,
|
"value": id,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"bool": util.MapStr{
|
|
||||||
"minimum_should_match": 1,
|
|
||||||
"should": []util.MapStr{
|
|
||||||
{
|
|
||||||
"term": util.MapStr{
|
|
||||||
"metadata.labels.pipeline_id": util.MapStr{
|
|
||||||
"value": "bulk_indexing",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"bool": util.MapStr{
|
|
||||||
"must": []util.MapStr{
|
|
||||||
{
|
{
|
||||||
"term": util.MapStr{
|
"term": util.MapStr{
|
||||||
"metadata.type": util.MapStr{
|
"metadata.type": util.MapStr{
|
||||||
|
@ -56,61 +71,84 @@ func (h *APIHandler) getMigrationMajorTaskInfo(majorTaskID string) (taskStats mi
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"terms": util.MapStr{
|
|
||||||
"status": []string{task.StatusComplete, task.StatusError},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
q := &orm.Query{
|
subTasks, err := migration_util.GetTasks(taskQuery)
|
||||||
RawQuery: util.MustToJSONBytes(taskQuery),
|
|
||||||
}
|
|
||||||
err, result := orm.Search(task.Task{}, q)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return taskStats, indexState, err
|
return taskStats, indexState, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var pipelineTaskIDs = map[string][]string{}
|
var indexMigrationTaskIDs []string
|
||||||
var pipelineIndexNames = map[string]string{}
|
|
||||||
indexState = map[string]MigrationIndexStateInfo{}
|
indexState = map[string]MigrationIndexStateInfo{}
|
||||||
for _, row := range result.Result {
|
for _, subTask := range subTasks {
|
||||||
buf := util.MustToJSONBytes(row)
|
|
||||||
subTask := task.Task{}
|
|
||||||
err := util.FromJSONBytes(buf, &subTask)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed to unmarshal task, err: %v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
if subTask.Metadata.Labels == nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
taskLabels := util.MapStr(subTask.Metadata.Labels)
|
taskLabels := util.MapStr(subTask.Metadata.Labels)
|
||||||
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
|
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
|
||||||
if indexName == "" {
|
if indexName == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// add indexDocs of already complete/error
|
if subTask.Status == task.StatusRunning {
|
||||||
if subTask.Metadata.Type == "index_migration" {
|
indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID)
|
||||||
indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs")
|
|
||||||
taskStats.IndexDocs += indexDocs
|
|
||||||
st := indexState[indexName]
|
|
||||||
st.IndexDocs += indexDocs
|
|
||||||
if subTask.Status == task.StatusError {
|
|
||||||
st.ErrorPartitions += 1
|
|
||||||
}
|
|
||||||
indexState[indexName] = st
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cfg := migration_model.IndexMigrationTaskConfig{}
|
||||||
|
err = migration_util.GetTaskConfig(&subTask, &cfg)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to get task config, err: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
indexDocs := migration_util.GetMapIntValue(taskLabels, "index_docs")
|
||||||
|
taskStats.IndexDocs += indexDocs
|
||||||
|
taskStats.SourceDocs += cfg.Source.DocCount
|
||||||
|
st := indexState[indexName]
|
||||||
|
st.IndexDocs += indexDocs
|
||||||
|
st.SourceDocs += cfg.Source.DocCount
|
||||||
|
if subTask.Status == task.StatusError {
|
||||||
|
st.ErrorPartitions += 1
|
||||||
|
taskStats.ErrorPartitions += 1
|
||||||
|
}
|
||||||
|
indexState[indexName] = st
|
||||||
|
indexMigrationTaskIDs = append(indexMigrationTaskIDs, subTask.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
taskQuery = util.MapStr{
|
||||||
|
"size": 500,
|
||||||
|
"query": util.MapStr{
|
||||||
|
"bool": util.MapStr{
|
||||||
|
"must": []util.MapStr{
|
||||||
|
{
|
||||||
|
"terms": util.MapStr{
|
||||||
|
"parent_id": indexMigrationTaskIDs,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"metadata.labels.pipeline_id": util.MapStr{
|
||||||
|
"value": "bulk_indexing",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
subTasks, err = migration_util.GetTasks(taskQuery)
|
||||||
|
if err != nil {
|
||||||
|
return taskStats, indexState, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var pipelineTaskIDs = map[string][]string{}
|
||||||
|
var pipelineIndexNames = map[string]string{}
|
||||||
|
for _, subTask := range subTasks {
|
||||||
|
taskLabels := util.MapStr(subTask.Metadata.Labels)
|
||||||
|
indexName := migration_util.GetMapStringValue(taskLabels, "unique_index_name")
|
||||||
|
if indexName == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
pipelineIndexNames[subTask.ID] = indexName
|
pipelineIndexNames[subTask.ID] = indexName
|
||||||
|
|
||||||
if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" {
|
if instID := migration_util.GetMapStringValue(taskLabels, "execution_instance_id"); instID != "" {
|
||||||
|
|
|
@ -1,9 +1,15 @@
|
||||||
package model
|
package model
|
||||||
|
|
||||||
import "fmt"
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"infini.sh/framework/core/util"
|
||||||
|
)
|
||||||
|
|
||||||
type ExecutionConfig struct {
|
type ExecutionConfig struct {
|
||||||
TimeWindow []TimeWindowItem `json:"time_window"`
|
TimeWindow []TimeWindowItem `json:"time_window"`
|
||||||
|
Repeat *Repeat `json:"repeat"`
|
||||||
Nodes struct {
|
Nodes struct {
|
||||||
Permit []ExecutionNode `json:"permit"`
|
Permit []ExecutionNode `json:"permit"`
|
||||||
} `json:"nodes"`
|
} `json:"nodes"`
|
||||||
|
@ -14,6 +20,12 @@ type ExecutionNode struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Repeat struct {
|
||||||
|
NextRunTime *time.Time `json:"next_run_time"`
|
||||||
|
Interval time.Duration `json:"interval"`
|
||||||
|
TotalRun int64 `json:"total_run"`
|
||||||
|
}
|
||||||
|
|
||||||
type TimeWindowItem struct {
|
type TimeWindowItem struct {
|
||||||
Start string `json:"start"`
|
Start string `json:"start"`
|
||||||
End string `json:"end"`
|
End string `json:"end"`
|
||||||
|
@ -25,6 +37,14 @@ type IndexPartition struct {
|
||||||
Step interface{} `json:"step"`
|
Step interface{} `json:"step"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type IndexIncremental struct {
|
||||||
|
FieldName string `json:"field_name"`
|
||||||
|
// Optional, data ingest delay
|
||||||
|
Delay time.Duration `json:"delay"`
|
||||||
|
// If full, run the data from -inf, else from current - step
|
||||||
|
Full bool `json:"full"`
|
||||||
|
}
|
||||||
|
|
||||||
type IndexInfo struct {
|
type IndexInfo struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
DocType string `json:"doc_type"`
|
DocType string `json:"doc_type"`
|
||||||
|
@ -42,3 +62,23 @@ type ClusterInfo struct {
|
||||||
Name string `json:"name"`
|
Name string `json:"name"`
|
||||||
Distribution string `json:"distribution,omitempty"`
|
Distribution string `json:"distribution,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BuildFilter generate a query filter, used by split task
|
||||||
|
func (incremental *IndexIncremental) BuildFilter(current int64, step time.Duration) (util.MapStr, error) {
|
||||||
|
if incremental == nil {
|
||||||
|
return util.MapStr{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
rv := util.MapStr{
|
||||||
|
"lt": int64(current) - incremental.Delay.Milliseconds(),
|
||||||
|
"format": "epoch_millis",
|
||||||
|
}
|
||||||
|
if !incremental.Full {
|
||||||
|
rv["gte"] = int64(current) - step.Milliseconds() - incremental.Delay.Milliseconds()
|
||||||
|
}
|
||||||
|
return util.MapStr{
|
||||||
|
"range": util.MapStr{
|
||||||
|
incremental.FieldName: rv,
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package model
|
package model
|
||||||
|
|
||||||
import "infini.sh/framework/core/util"
|
import (
|
||||||
|
"infini.sh/framework/core/util"
|
||||||
|
)
|
||||||
|
|
||||||
type ClusterComparisonTaskConfig struct {
|
type ClusterComparisonTaskConfig struct {
|
||||||
Cluster struct {
|
Cluster struct {
|
||||||
|
@ -23,6 +25,7 @@ type ClusterComparisonIndexConfig struct {
|
||||||
Source IndexInfo `json:"source"`
|
Source IndexInfo `json:"source"`
|
||||||
Target IndexInfo `json:"target"`
|
Target IndexInfo `json:"target"`
|
||||||
RawFilter interface{} `json:"raw_filter"`
|
RawFilter interface{} `json:"raw_filter"`
|
||||||
|
Incremental *IndexIncremental `json:"incremental"`
|
||||||
Partition *IndexPartition `json:"partition,omitempty"`
|
Partition *IndexPartition `json:"partition,omitempty"`
|
||||||
|
|
||||||
// only used in API
|
// only used in API
|
||||||
|
@ -30,12 +33,6 @@ type ClusterComparisonIndexConfig struct {
|
||||||
ErrorPartitions int `json:"error_partitions,omitempty"`
|
ErrorPartitions int `json:"error_partitions,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClusterComparisonTaskState struct {
|
|
||||||
SourceScrollDocs int64
|
|
||||||
TargetScrollDocs int64
|
|
||||||
Status string
|
|
||||||
}
|
|
||||||
|
|
||||||
type IndexComparisonTaskConfig struct {
|
type IndexComparisonTaskConfig struct {
|
||||||
Source IndexComparisonDumpConfig `json:"source"`
|
Source IndexComparisonDumpConfig `json:"source"`
|
||||||
Target IndexComparisonDumpConfig `json:"target"`
|
Target IndexComparisonDumpConfig `json:"target"`
|
||||||
|
|
|
@ -33,6 +33,7 @@ type ClusterMigrationIndexConfig struct {
|
||||||
RawFilter interface{} `json:"raw_filter"`
|
RawFilter interface{} `json:"raw_filter"`
|
||||||
IndexRename map[string]interface{} `json:"index_rename"`
|
IndexRename map[string]interface{} `json:"index_rename"`
|
||||||
TypeRename map[string]interface{} `json:"type_rename"`
|
TypeRename map[string]interface{} `json:"type_rename"`
|
||||||
|
Incremental *IndexIncremental `json:"incremental"`
|
||||||
Partition *IndexPartition `json:"partition,omitempty"`
|
Partition *IndexPartition `json:"partition,omitempty"`
|
||||||
|
|
||||||
// only used in API
|
// only used in API
|
||||||
|
@ -42,6 +43,8 @@ type ClusterMigrationIndexConfig struct {
|
||||||
|
|
||||||
type ClusterMigrationTaskState struct {
|
type ClusterMigrationTaskState struct {
|
||||||
IndexDocs int64
|
IndexDocs int64
|
||||||
|
SourceDocs int64
|
||||||
|
ErrorPartitions int64
|
||||||
Status string
|
Status string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -110,6 +110,10 @@ func (p *DispatcherProcessor) Name() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error {
|
func (p *DispatcherProcessor) Process(ctx *pipeline.Context) error {
|
||||||
|
// handle repeating tasks
|
||||||
|
p.handleRepeatingTasks(ctx, "cluster_comparison")
|
||||||
|
p.handleRepeatingTasks(ctx, "cluster_migration")
|
||||||
|
|
||||||
// handle pipeline task
|
// handle pipeline task
|
||||||
p.handleTasks(ctx, "pipeline", []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, p.pipelineTaskProcessor.Process)
|
p.handleTasks(ctx, "pipeline", []string{task2.StatusReady, task2.StatusRunning, task2.StatusPendingStop}, p.pipelineTaskProcessor.Process)
|
||||||
|
|
||||||
|
@ -203,7 +207,7 @@ func (p *DispatcherProcessor) getMigrationTasks(taskType string, taskStatus []st
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return migration_util.GetTasks(p.config.Elasticsearch, p.config.IndexName, queryDsl)
|
return migration_util.GetTasks(queryDsl)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, taskResult *task2.TaskResult, message string) {
|
func (p *DispatcherProcessor) saveTaskAndWriteLog(taskItem *task2.Task, taskResult *task2.TaskResult, message string) {
|
||||||
|
|
|
@ -0,0 +1,130 @@
|
||||||
|
package migration
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/cihub/seelog"
|
||||||
|
"infini.sh/console/plugin/migration/cluster_comparison"
|
||||||
|
"infini.sh/console/plugin/migration/cluster_migration"
|
||||||
|
migration_util "infini.sh/console/plugin/migration/util"
|
||||||
|
"infini.sh/framework/core/pipeline"
|
||||||
|
"infini.sh/framework/core/task"
|
||||||
|
"infini.sh/framework/core/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (p *DispatcherProcessor) handleRepeatingTasks(ctx *pipeline.Context, taskType string) {
|
||||||
|
tasks, err := p.getPendingExecutionTasks(taskType, p.config.TaskBatchSize)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to get pending [%s] tasks, err: %v", taskType, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if len(tasks) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Debugf("handling pending [%s] tasks, count: %d", taskType, len(tasks))
|
||||||
|
// refresh index after each batch
|
||||||
|
defer func() {
|
||||||
|
p.refreshTask()
|
||||||
|
}()
|
||||||
|
for i := range tasks {
|
||||||
|
if ctx.IsCanceled() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
taskItem := &tasks[i]
|
||||||
|
err := p.handleTask(taskItem, p.handleRepeatingTask)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to handle task [%s]: [%v]", taskItem.ID, err)
|
||||||
|
|
||||||
|
taskItem.Status = task.StatusError
|
||||||
|
tn := time.Now()
|
||||||
|
taskItem.CompletedTime = &tn
|
||||||
|
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||||
|
Success: false,
|
||||||
|
Error: err.Error(),
|
||||||
|
}, fmt.Sprintf("failed to handle task [%s]", taskItem.ID))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *DispatcherProcessor) getPendingExecutionTasks(taskType string, size int) ([]task.Task, error) {
|
||||||
|
queryDsl := util.MapStr{
|
||||||
|
"size": size,
|
||||||
|
"sort": []util.MapStr{
|
||||||
|
{
|
||||||
|
"created": util.MapStr{
|
||||||
|
"order": "asc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"query": util.MapStr{
|
||||||
|
"bool": util.MapStr{
|
||||||
|
"must": []util.MapStr{
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"metadata.type": taskType,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"metadata.labels.repeat_triggered": util.MapStr{
|
||||||
|
"value": false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"range": util.MapStr{
|
||||||
|
"metadata.labels.next_run_time": util.MapStr{
|
||||||
|
"lte": time.Now().UnixMilli(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return migration_util.GetTasks(queryDsl)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: we handle repeating in two iterations:
|
||||||
|
// - the first iteration will mark the task as ready to run
|
||||||
|
// - the second iteration will trigger the next repeat, and update the status accordingly
|
||||||
|
// This will make the second step automatically retryable
|
||||||
|
func (p *DispatcherProcessor) handleRepeatingTask(taskItem *task.Task) error {
|
||||||
|
if taskItem.Status == task.StatusInit {
|
||||||
|
taskItem.Status = task.StatusReady
|
||||||
|
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||||
|
Success: true,
|
||||||
|
}, fmt.Sprintf("task started automatically"))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
repeatDone := migration_util.GetMapBoolValue(taskItem.Metadata.Labels, "repeat_done")
|
||||||
|
if repeatDone {
|
||||||
|
taskItem.Metadata.Labels["repeat_triggered"] = true
|
||||||
|
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||||
|
Success: true,
|
||||||
|
}, fmt.Sprintf("task repeat ended"))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var nextTask *task.Task
|
||||||
|
var err error
|
||||||
|
switch taskItem.Metadata.Type {
|
||||||
|
case "cluster_migration":
|
||||||
|
nextTask, err = cluster_migration.RepeatTask(taskItem)
|
||||||
|
case "cluster_comparison":
|
||||||
|
nextTask, err = cluster_comparison.RepeatTask(taskItem)
|
||||||
|
default:
|
||||||
|
return errors.New("invalid type")
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to repeat task [%s], err: %v", taskItem.ID, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
taskItem.Metadata.Labels["repeat_triggered"] = true
|
||||||
|
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||||
|
Success: true,
|
||||||
|
}, fmt.Sprintf("next repeat task [%s] created", nextTask.ID))
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -4,13 +4,13 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
log "github.com/cihub/seelog"
|
log "github.com/cihub/seelog"
|
||||||
"infini.sh/framework/core/elastic"
|
|
||||||
"infini.sh/framework/core/orm"
|
"infini.sh/framework/core/orm"
|
||||||
"infini.sh/framework/core/task"
|
"infini.sh/framework/core/task"
|
||||||
"infini.sh/framework/core/util"
|
"infini.sh/framework/core/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
func DeleteChildTasks(taskID string) error {
|
func DeleteChildTasks(taskID string, taskType string) error {
|
||||||
q := util.MapStr{
|
q := util.MapStr{
|
||||||
"query": util.MapStr{
|
"query": util.MapStr{
|
||||||
"bool": util.MapStr{
|
"bool": util.MapStr{
|
||||||
|
@ -22,6 +22,11 @@ func DeleteChildTasks(taskID string) error {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"metadata.type": taskType,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -33,11 +38,78 @@ func DeleteChildTasks(taskID string) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetPendingChildTasks(elasticsearch, indexName string, taskID string, taskType string) ([]task.Task, error) {
|
func GetLastRepeatingChildTask(taskID string, taskType string) (*task.Task, error) {
|
||||||
return GetChildTasks(elasticsearch, indexName, taskID, taskType, []string{task.StatusRunning, task.StatusPendingStop, task.StatusReady})
|
queryDsl := util.MapStr{
|
||||||
|
"size": 1,
|
||||||
|
"sort": []util.MapStr{
|
||||||
|
{
|
||||||
|
"metadata.labels.next_run_time": util.MapStr{
|
||||||
|
"order": "desc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"query": util.MapStr{
|
||||||
|
"bool": util.MapStr{
|
||||||
|
"must": []util.MapStr{
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"metadata.type": taskType,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"parent_id": util.MapStr{
|
||||||
|
"value": taskID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
tasks, err := GetTasks(queryDsl)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(tasks) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return &tasks[0], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetChildTasks(elasticsearch, indexName string, taskID string, taskType string, status []string) ([]task.Task, error) {
|
func GetPendingChildTasks(taskID string, taskType string) ([]task.Task, error) {
|
||||||
|
return GetChildTasks(taskID, taskType, []string{task.StatusRunning, task.StatusPendingStop, task.StatusReady})
|
||||||
|
}
|
||||||
|
|
||||||
|
func CountRunningChildren(taskID string, taskType string) (int64, error) {
|
||||||
|
return CountTasks(util.MapStr{
|
||||||
|
"query": util.MapStr{
|
||||||
|
"bool": util.MapStr{
|
||||||
|
"must": []util.MapStr{
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"metadata.type": taskType,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"term": util.MapStr{
|
||||||
|
"parent_id": util.MapStr{
|
||||||
|
"value": taskID,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"terms": util.MapStr{
|
||||||
|
"status": []string{task.StatusRunning, task.StatusPendingStop, task.StatusReady},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetChildTasks(taskID string, taskType string, status []string) ([]task.Task, error) {
|
||||||
musts := []util.MapStr{
|
musts := []util.MapStr{
|
||||||
{
|
{
|
||||||
"term": util.MapStr{
|
"term": util.MapStr{
|
||||||
|
@ -67,22 +139,27 @@ func GetChildTasks(elasticsearch, indexName string, taskID string, taskType stri
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
return GetTasks(elasticsearch, indexName, queryDsl)
|
return GetTasks(queryDsl)
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetTasks(elasticsearch, indexName string, query util.MapStr) ([]task.Task, error) {
|
func CountTasks(query util.MapStr) (int64, error) {
|
||||||
esClient := elastic.GetClient(elasticsearch)
|
return orm.Count(task.Task{}, util.MustToJSONBytes(query))
|
||||||
res, err := esClient.SearchWithRawQueryDSL(indexName, util.MustToJSONBytes(query))
|
}
|
||||||
|
|
||||||
|
func GetTasks(query util.MapStr) ([]task.Task, error) {
|
||||||
|
err, res := orm.Search(task.Task{}, &orm.Query{
|
||||||
|
RawQuery: util.MustToJSONBytes(query),
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("query tasks from es failed, err: %v", err)
|
log.Errorf("query tasks from es failed, err: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if res.GetTotal() == 0 {
|
if res.Total == 0 {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
var tasks []task.Task
|
var tasks []task.Task
|
||||||
for _, hit := range res.Hits.Hits {
|
for _, row := range res.Result {
|
||||||
buf, err := util.ToJSONBytes(hit.Source)
|
buf, err := util.ToJSONBytes(row)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("marshal task json failed, err: %v", err)
|
log.Errorf("marshal task json failed, err: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -93,6 +170,9 @@ func GetTasks(elasticsearch, indexName string, query util.MapStr) ([]task.Task,
|
||||||
log.Errorf("unmarshal task json failed, err: %v", err)
|
log.Errorf("unmarshal task json failed, err: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if tk.Metadata.Labels == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
tasks = append(tasks, tk)
|
tasks = append(tasks, tk)
|
||||||
}
|
}
|
||||||
return tasks, nil
|
return tasks, nil
|
||||||
|
|
|
@ -0,0 +1,75 @@
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
migration_model "infini.sh/console/plugin/migration/model"
|
||||||
|
"infini.sh/framework/core/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
func UpdateRepeatState(repeat *migration_model.Repeat, labels util.MapStr) error {
|
||||||
|
if labels == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if !isValidRepeat(repeat) {
|
||||||
|
labels["repeat_done"] = true
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
labels["is_repeat"] = true
|
||||||
|
|
||||||
|
runTimes := GetMapIntValue(labels, "run_times")
|
||||||
|
runTimes += 1
|
||||||
|
labels["run_times"] = runTimes
|
||||||
|
|
||||||
|
if repeat.TotalRun >= 1 && runTimes >= repeat.TotalRun {
|
||||||
|
labels["repeat_done"] = true
|
||||||
|
}
|
||||||
|
if _, ok := labels["next_run_time"]; !ok {
|
||||||
|
if repeat.NextRunTime != nil {
|
||||||
|
labels["next_run_time"] = repeat.NextRunTime.UnixMilli()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
nextRunTime := GetMapIntValue(labels, "next_run_time")
|
||||||
|
labels["next_run_time"] = nextRunTime + repeat.Interval.Milliseconds()
|
||||||
|
}
|
||||||
|
labels["repeat_triggered"] = false
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func CopyRepeatState(oldLabels, newLabels util.MapStr) {
|
||||||
|
newLabels["run_times"] = oldLabels["run_times"]
|
||||||
|
newLabels["next_run_time"] = oldLabels["next_run_time"]
|
||||||
|
}
|
||||||
|
|
||||||
|
func IsRepeating(repeat *migration_model.Repeat, labels map[string]interface{}) bool {
|
||||||
|
if !isValidRepeat(repeat) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if repeat.TotalRun < 1 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if repeat.TotalRun == 1 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
nextRunTime := GetMapIntValue(labels, "next_run_time")
|
||||||
|
// not started yet
|
||||||
|
if nextRunTime == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
endTime := time.UnixMilli(nextRunTime).Add(time.Duration(repeat.TotalRun-1) * repeat.Interval)
|
||||||
|
if time.Now().Before(endTime) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func isValidRepeat(repeat *migration_model.Repeat) bool {
|
||||||
|
if repeat == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if repeat.Interval < time.Minute {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
|
@ -47,6 +47,13 @@ func IsPendingState(status string) bool {
|
||||||
return util.StringInArray(pendingTaskStatus, status)
|
return util.StringInArray(pendingTaskStatus, status)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetDirectParentId(parentIDs []string) string {
|
||||||
|
if len(parentIDs) == 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return parentIDs[len(parentIDs)-1]
|
||||||
|
}
|
||||||
|
|
||||||
func GetTaskConfig(task *task.Task, config interface{}) error {
|
func GetTaskConfig(task *task.Task, config interface{}) error {
|
||||||
if task.Config_ == nil {
|
if task.Config_ == nil {
|
||||||
return util.FromJSONBytes([]byte(task.ConfigString), config)
|
return util.FromJSONBytes([]byte(task.ConfigString), config)
|
||||||
|
@ -71,6 +78,19 @@ func GetMapIntValue(m util.MapStr, key string) int64 {
|
||||||
return vv
|
return vv
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetMapBoolValue(m util.MapStr, key string) bool {
|
||||||
|
v, err := m.GetValue(key)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
vv, err := util.ExtractBool(v)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("got %s but failed to extract, err: %v", key, err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return vv
|
||||||
|
}
|
||||||
|
|
||||||
func GetMapStringValue(m util.MapStr, key string) string {
|
func GetMapStringValue(m util.MapStr, key string) string {
|
||||||
v, err := m.GetValue(key)
|
v, err := m.GetValue(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
Loading…
Reference in New Issue