[migration] clean all business_id checks
This commit is contained in:
parent
d37649faa2
commit
c89c8616b1
|
@ -121,7 +121,7 @@ func (h *APIHandler) searchDataMigrationTask(w http.ResponseWriter, req *http.Re
|
||||||
)
|
)
|
||||||
mustQ = append(mustQ, util.MapStr{
|
mustQ = append(mustQ, util.MapStr{
|
||||||
"term": util.MapStr{
|
"term": util.MapStr{
|
||||||
"metadata.labels.business_id": util.MapStr{
|
"metadata.type": util.MapStr{
|
||||||
"value": "cluster_migration",
|
"value": "cluster_migration",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -253,7 +253,7 @@ func (h *APIHandler) startDataMigration(w http.ResponseWriter, req *http.Request
|
||||||
Success: true,
|
Success: true,
|
||||||
}, "task status manually set to ready")
|
}, "task status manually set to ready")
|
||||||
|
|
||||||
if obj.Metadata.Labels != nil && obj.Metadata.Labels["business_id"] == "index_migration" && len(obj.ParentId) > 0 {
|
if obj.Metadata.Labels != nil && obj.Metadata.Type == "index_migration" && len(obj.ParentId) > 0 {
|
||||||
//update status of major task to running
|
//update status of major task to running
|
||||||
query := util.MapStr{
|
query := util.MapStr{
|
||||||
"bool": util.MapStr{
|
"bool": util.MapStr{
|
||||||
|
@ -492,7 +492,7 @@ func getMajorTaskInfoByIndex(taskID string) (map[string]migration_model.IndexSta
|
||||||
"must": []util.MapStr{
|
"must": []util.MapStr{
|
||||||
{
|
{
|
||||||
"term": util.MapStr{
|
"term": util.MapStr{
|
||||||
"metadata.labels.business_id": util.MapStr{
|
"metadata.type": util.MapStr{
|
||||||
"value": "index_migration",
|
"value": "index_migration",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -668,7 +668,7 @@ func (h *APIHandler) getDataMigrationTaskOfIndex(w http.ResponseWriter, req *htt
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if subTask.Metadata.Labels != nil {
|
if subTask.Metadata.Labels != nil {
|
||||||
if subTask.Metadata.Labels["business_id"] == "index_migration" {
|
if subTask.Metadata.Type == "index_migration" {
|
||||||
subTasks = append(subTasks, subTask)
|
subTasks = append(subTasks, subTask)
|
||||||
subTaskStatus[subTask.ID] = subTask.Status
|
subTaskStatus[subTask.ID] = subTask.Status
|
||||||
continue
|
continue
|
||||||
|
@ -1102,7 +1102,7 @@ func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats migration_mod
|
||||||
"must": []util.MapStr{
|
"must": []util.MapStr{
|
||||||
{
|
{
|
||||||
"term": util.MapStr{
|
"term": util.MapStr{
|
||||||
"metadata.labels.business_id": util.MapStr{
|
"metadata.type": util.MapStr{
|
||||||
"value": "index_migration",
|
"value": "index_migration",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -1140,7 +1140,7 @@ func getMajorTaskStatsFromInstances(majorTaskID string) (taskStats migration_mod
|
||||||
}
|
}
|
||||||
if subTask.Metadata.Labels != nil {
|
if subTask.Metadata.Labels != nil {
|
||||||
//add indexDocs of already complete
|
//add indexDocs of already complete
|
||||||
if subTask.Metadata.Labels["business_id"] == "index_migration" {
|
if subTask.Metadata.Type == "index_migration" {
|
||||||
if v, ok := subTask.Metadata.Labels["index_docs"].(float64); ok {
|
if v, ok := subTask.Metadata.Labels["index_docs"].(float64); ok {
|
||||||
taskStats.IndexDocs += v
|
taskStats.IndexDocs += v
|
||||||
}
|
}
|
||||||
|
|
|
@ -209,7 +209,7 @@ func (p *DispatcherProcessor) handleReadyMajorTask(taskItem *task2.Task) error {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"term": util.MapStr{
|
"term": util.MapStr{
|
||||||
"metadata.labels.business_id": util.MapStr{
|
"metadata.type": util.MapStr{
|
||||||
"value": "index_migration",
|
"value": "index_migration",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -700,10 +700,6 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
|
||||||
if taskItem.Metadata.Labels["is_split"] == true {
|
if taskItem.Metadata.Labels["is_split"] == true {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if taskItem.Metadata.Labels["business_id"] != "cluster_migration" {
|
|
||||||
log.Tracef("got unexpect task type of %s with task id [%s] in cluster migration processor", taskItem.Metadata.Type, taskItem.ID)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
clusterMigrationTask := migration_model.ClusterMigrationTaskConfig{}
|
clusterMigrationTask := migration_model.ClusterMigrationTaskConfig{}
|
||||||
err := migration_util.GetTaskConfig(taskItem, &clusterMigrationTask)
|
err := migration_util.GetTaskConfig(taskItem, &clusterMigrationTask)
|
||||||
|
@ -1026,7 +1022,7 @@ func (p *DispatcherProcessor) getInstanceTaskState() (map[string]DispatcherState
|
||||||
"must": []util.MapStr{
|
"must": []util.MapStr{
|
||||||
{
|
{
|
||||||
"term": util.MapStr{
|
"term": util.MapStr{
|
||||||
"metadata.labels.business_id": util.MapStr{
|
"metadata.type": util.MapStr{
|
||||||
"value": "index_migration",
|
"value": "index_migration",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
Loading…
Reference in New Issue