Merge branch 'master' of ssh://git.infini.ltd:64221/infini/console
This commit is contained in:
commit
00c48bf9d5
|
@ -7,41 +7,41 @@ package model
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"infini.sh/framework/core/agent"
|
||||
"infini.sh/framework/core/orm"
|
||||
"infini.sh/framework/core/util"
|
||||
"infini.sh/framework/modules/pipeline"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
||||
type Instance struct {
|
||||
orm.ORMObjectBase
|
||||
|
||||
//InstanceID string `json:"instance_id,omitempty" elastic_mapping:"instance_id: { type: keyword }"`
|
||||
Name string `json:"name,omitempty" elastic_mapping:"name:{type:keyword,fields:{text: {type: text}}}"`
|
||||
Endpoint string `json:"endpoint,omitempty" elastic_mapping:"endpoint: { type: keyword }"`
|
||||
Version map[string]interface{} `json:"version,omitempty" elastic_mapping:"version: { type: object }"`
|
||||
BasicAuth agent.BasicAuth `config:"basic_auth" json:"basic_auth,omitempty" elastic_mapping:"basic_auth:{type:object}"`
|
||||
Owner string `json:"owner,omitempty" config:"owner" elastic_mapping:"owner:{type:keyword}"`
|
||||
Tags [] string `json:"tags,omitempty"`
|
||||
Description string `json:"description,omitempty" config:"description" elastic_mapping:"description:{type:keyword}"`
|
||||
Name string `json:"name,omitempty" elastic_mapping:"name:{type:keyword,fields:{text: {type: text}}}"`
|
||||
Endpoint string `json:"endpoint,omitempty" elastic_mapping:"endpoint: { type: keyword }"`
|
||||
Version map[string]interface{} `json:"version,omitempty" elastic_mapping:"version: { type: object }"`
|
||||
BasicAuth agent.BasicAuth `config:"basic_auth" json:"basic_auth,omitempty" elastic_mapping:"basic_auth:{type:object}"`
|
||||
Owner string `json:"owner,omitempty" config:"owner" elastic_mapping:"owner:{type:keyword}"`
|
||||
Tags []string `json:"tags,omitempty"`
|
||||
Description string `json:"description,omitempty" config:"description" elastic_mapping:"description:{type:keyword}"`
|
||||
}
|
||||
|
||||
func (inst *Instance) CreatePipeline(body []byte) error {
|
||||
req := &util.Request{
|
||||
Method: http.MethodPost,
|
||||
Body: body,
|
||||
Url: inst.Endpoint + "/pipeline/tasks/",
|
||||
Body: body,
|
||||
Url: inst.Endpoint + "/pipeline/tasks/",
|
||||
}
|
||||
return inst.doRequest(req, nil)
|
||||
}
|
||||
|
||||
func (inst *Instance) StopPipeline(ctx context.Context, pipelineID string) error {
|
||||
req := &util.Request{
|
||||
Method: http.MethodPost,
|
||||
Url: fmt.Sprintf("%s/pipeline/task/%s/_stop", inst.Endpoint, pipelineID),
|
||||
Method: http.MethodPost,
|
||||
Url: fmt.Sprintf("%s/pipeline/task/%s/_stop", inst.Endpoint, pipelineID),
|
||||
Context: ctx,
|
||||
}
|
||||
return inst.doRequest(req, nil)
|
||||
|
@ -56,16 +56,15 @@ func (inst *Instance) StopPipelineWithTimeout(pipelineID string, duration time.D
|
|||
func (inst *Instance) StartPipeline(pipelineID string) error {
|
||||
req := &util.Request{
|
||||
Method: http.MethodPost,
|
||||
Url: fmt.Sprintf("%s/pipeline/task/%s/_start", inst.Endpoint, pipelineID),
|
||||
Url: fmt.Sprintf("%s/pipeline/task/%s/_start", inst.Endpoint, pipelineID),
|
||||
}
|
||||
return inst.doRequest(req, nil)
|
||||
}
|
||||
|
||||
|
||||
func (inst *Instance) DeletePipeline(pipelineID string) error {
|
||||
req := &util.Request{
|
||||
Method: http.MethodDelete,
|
||||
Url: fmt.Sprintf("%s/pipeline/task/%s", inst.Endpoint, pipelineID),
|
||||
Url: fmt.Sprintf("%s/pipeline/task/%s", inst.Endpoint, pipelineID),
|
||||
}
|
||||
return inst.doRequest(req, nil)
|
||||
}
|
||||
|
@ -77,9 +76,9 @@ func (inst *Instance) GetPipelinesByIDs(pipelineIDs []string) (pipeline.GetPipel
|
|||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
req := &util.Request{
|
||||
Method: http.MethodPost,
|
||||
Url: fmt.Sprintf("%s/pipeline/tasks/_search", inst.Endpoint),
|
||||
Body: body,
|
||||
Method: http.MethodPost,
|
||||
Url: fmt.Sprintf("%s/pipeline/tasks/_search", inst.Endpoint),
|
||||
Body: body,
|
||||
Context: ctx,
|
||||
}
|
||||
res := pipeline.GetPipelinesResponse{}
|
||||
|
@ -90,7 +89,18 @@ func (inst *Instance) GetPipelinesByIDs(pipelineIDs []string) (pipeline.GetPipel
|
|||
func (inst *Instance) DeleteQueueBySelector(selector util.MapStr) error {
|
||||
req := &util.Request{
|
||||
Method: http.MethodDelete,
|
||||
Url: fmt.Sprintf("%s/queue/_search", inst.Endpoint),
|
||||
Url: fmt.Sprintf("%s/queue/_search", inst.Endpoint),
|
||||
Body: util.MustToJSONBytes(util.MapStr{
|
||||
"selector": selector,
|
||||
}),
|
||||
}
|
||||
return inst.doRequest(req, nil)
|
||||
}
|
||||
|
||||
func (inst *Instance) DeleteQueueConsumersBySelector(selector util.MapStr) error {
|
||||
req := &util.Request{
|
||||
Method: http.MethodDelete,
|
||||
Url: fmt.Sprintf("%s/queue/consumer/_search", inst.Endpoint),
|
||||
Body: util.MustToJSONBytes(util.MapStr{
|
||||
"selector": selector,
|
||||
}),
|
||||
|
@ -100,8 +110,8 @@ func (inst *Instance) DeleteQueueBySelector(selector util.MapStr) error {
|
|||
|
||||
func (inst *Instance) TryConnect(ctx context.Context) error {
|
||||
req := &util.Request{
|
||||
Method: http.MethodGet,
|
||||
Url: fmt.Sprintf("%s/_framework/api/_info", inst.Endpoint),
|
||||
Method: http.MethodGet,
|
||||
Url: fmt.Sprintf("%s/_framework/api/_info", inst.Endpoint),
|
||||
Context: ctx,
|
||||
}
|
||||
return inst.doRequest(req, nil)
|
||||
|
@ -125,4 +135,4 @@ func (inst *Instance) doRequest(req *util.Request, resBody interface{}) error {
|
|||
return util.FromJSONBytes(result.Body, resBody)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -105,10 +105,16 @@ type IndexStateInfo struct {
|
|||
IndexDocs float64
|
||||
}
|
||||
|
||||
const (
|
||||
IndexMigrationV0 = 0
|
||||
IndexMigrationV1 = 1
|
||||
)
|
||||
|
||||
type IndexMigrationTaskConfig struct {
|
||||
Source IndexMigrationSourceConfig `json:"source"`
|
||||
Target IndexMigrationTargetConfig `json:"target"`
|
||||
Execution ExecutionConfig `json:"execution"`
|
||||
Version int `json:"version"`
|
||||
}
|
||||
|
||||
type IndexMigrationSourceConfig struct {
|
||||
|
|
|
@ -356,9 +356,9 @@ func (p *DispatcherProcessor) handleRunningSubTask(taskItem *task2.Task) error {
|
|||
p.saveTaskAndWriteLog(taskItem, &task2.TaskResult{
|
||||
Success: true,
|
||||
}, "index migration completed")
|
||||
// clean disk queue if bulk completed
|
||||
p.cleanGatewayQueue(taskItem)
|
||||
}
|
||||
// clean disk queue if bulk failed/completed
|
||||
p.cleanGatewayQueue(taskItem)
|
||||
p.decrInstanceJobs(instanceID)
|
||||
return nil
|
||||
|
||||
|
@ -446,8 +446,6 @@ func (p *DispatcherProcessor) handlePendingStopSubTask(taskItem *task2.Task) err
|
|||
if len(tasks) == 0 {
|
||||
taskItem.Status = task2.StatusStopped
|
||||
p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("index migration task [%s] stopped", taskItem.ID))
|
||||
// clean disk queue if manually stopped
|
||||
p.cleanGatewayQueue(taskItem)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -619,6 +617,13 @@ func (p *DispatcherProcessor) handleSplitSubTask(taskItem *task2.Task) error {
|
|||
}
|
||||
|
||||
func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error {
|
||||
cfg := migration_model.IndexMigrationTaskConfig{}
|
||||
err := migration_util.GetTaskConfig(taskItem, &cfg)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get task config, err: %v", err)
|
||||
return fmt.Errorf("got wrong config of task [%s]", taskItem.ID)
|
||||
}
|
||||
|
||||
scrollTask, bulkTask, err := p.getScrollBulkPipelineTasks(taskItem)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get pipeline tasks, err: %v", err)
|
||||
|
@ -630,28 +635,62 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error
|
|||
return nil
|
||||
}
|
||||
|
||||
instance, err := p.getPreferenceInstance(taskItem.ParentId[0])
|
||||
if err != nil {
|
||||
return fmt.Errorf("get preference intance error: %w", err)
|
||||
}
|
||||
if p.getInstanceState(instance.ID).Total >= p.config.MaxTasksPerInstance {
|
||||
log.Debugf("hit max tasks per instance with %d, skip dispatch", p.config.MaxTasksPerInstance)
|
||||
return nil
|
||||
instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
|
||||
totalDocs := cfg.Source.DocCount
|
||||
scrolled, _, err := p.checkScrollPipelineTaskStatus(scrollTask, totalDocs)
|
||||
|
||||
redoScroll := true
|
||||
if cfg.Version >= migration_model.IndexMigrationV1 {
|
||||
// skip scroll if possible
|
||||
if scrolled && err == nil {
|
||||
redoScroll = false
|
||||
// reset queue consumer offset
|
||||
err = p.resetGatewayQueue(taskItem)
|
||||
if err != nil {
|
||||
log.Infof("task [%s] failed to reset gateway queue, redo scroll", taskItem.ID)
|
||||
redoScroll = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// try to clear disk queue before running es_scroll
|
||||
p.cleanGatewayQueue(taskItem)
|
||||
if !redoScroll {
|
||||
migration_util.WriteLog(taskItem, nil, fmt.Sprintf("task [%s] skiping scroll", taskItem.ID))
|
||||
} else {
|
||||
var executionConfig migration_model.ExecutionConfig
|
||||
if cfg.Version >= migration_model.IndexMigrationV1 {
|
||||
executionConfig = cfg.Execution
|
||||
} else {
|
||||
executionConfig, err = p.getExecutionConfigFromMajorTask(taskItem)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get execution config from parent task failed, err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// update scroll task to ready
|
||||
scrollTask.Metadata.Labels["execution_instance_id"] = instance.ID
|
||||
scrollTask.Status = task2.StatusReady
|
||||
err = orm.Update(nil, scrollTask)
|
||||
if err != nil {
|
||||
return fmt.Errorf("update scroll pipeline task error: %w", err)
|
||||
// get a new instanceID
|
||||
instance, err := p.getPreferenceInstance(executionConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get preference intance error: %w", err)
|
||||
}
|
||||
if p.getInstanceState(instance.ID).Total >= p.config.MaxTasksPerInstance {
|
||||
log.Debugf("hit max tasks per instance with %d, skip dispatch", p.config.MaxTasksPerInstance)
|
||||
return nil
|
||||
}
|
||||
instanceID = instance.ID
|
||||
|
||||
// update instance info first
|
||||
scrollTask.Metadata.Labels["execution_instance_id"] = instanceID
|
||||
// try to clear disk queue before running es_scroll
|
||||
p.cleanGatewayQueue(taskItem)
|
||||
// update scroll task to ready
|
||||
scrollTask.Status = task2.StatusReady
|
||||
err = orm.Update(nil, scrollTask)
|
||||
if err != nil {
|
||||
return fmt.Errorf("update scroll pipeline task error: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// update bulk task to init
|
||||
bulkTask.Metadata.Labels["execution_instance_id"] = instance.ID
|
||||
bulkTask.Metadata.Labels["execution_instance_id"] = instanceID
|
||||
bulkTask.Status = task2.StatusInit
|
||||
err = orm.Update(nil, bulkTask)
|
||||
if err != nil {
|
||||
|
@ -660,7 +699,7 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error
|
|||
|
||||
// update sub migration task status to running and save task log
|
||||
taskItem.RetryTimes++
|
||||
taskItem.Metadata.Labels["execution_instance_id"] = instance.ID
|
||||
taskItem.Metadata.Labels["execution_instance_id"] = instanceID
|
||||
taskItem.Metadata.Labels["index_docs"] = 0
|
||||
taskItem.Metadata.Labels["scrolled_docs"] = 0
|
||||
taskItem.Status = task2.StatusRunning
|
||||
|
@ -670,11 +709,12 @@ func (p *DispatcherProcessor) handleScheduleSubTask(taskItem *task2.Task) error
|
|||
Success: true,
|
||||
}, fmt.Sprintf("task [%s] started", taskItem.ID))
|
||||
// update dispatcher state
|
||||
p.incrInstanceJobs(instance.ID)
|
||||
p.incrInstanceJobs(instanceID)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instance model.Instance, err error) {
|
||||
func (p *DispatcherProcessor) getExecutionConfigFromMajorTask(taskItem *task2.Task) (config migration_model.ExecutionConfig, err error) {
|
||||
majorTaskID := taskItem.ParentId[0]
|
||||
majorTask := task2.Task{}
|
||||
majorTask.ID = majorTaskID
|
||||
_, err = orm.Get(&majorTask)
|
||||
|
@ -688,11 +728,16 @@ func (p *DispatcherProcessor) getPreferenceInstance(majorTaskID string) (instanc
|
|||
log.Errorf("failed to get task config, err: %v", err)
|
||||
return
|
||||
}
|
||||
config = cfg.Settings.Execution
|
||||
return
|
||||
}
|
||||
|
||||
func (p *DispatcherProcessor) getPreferenceInstance(config migration_model.ExecutionConfig) (instance model.Instance, err error) {
|
||||
var (
|
||||
total = math.MaxInt
|
||||
tempInst = model.Instance{}
|
||||
)
|
||||
for _, node := range cfg.Settings.Execution.Nodes.Permit {
|
||||
for _, node := range config.Nodes.Permit {
|
||||
instanceTotal := p.getInstanceState(node.ID).Total
|
||||
if instanceTotal < total {
|
||||
if p.config.CheckInstanceAvailable {
|
||||
|
@ -917,6 +962,7 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
|
|||
Source: partitionSource,
|
||||
Target: target,
|
||||
Execution: clusterMigrationTask.Settings.Execution,
|
||||
Version: migration_model.IndexMigrationV1,
|
||||
}),
|
||||
}
|
||||
partitionMigrationTask.ID = util.GetUUID()
|
||||
|
@ -930,8 +976,10 @@ func (p *DispatcherProcessor) splitMajorMigrationTask(taskItem *task2.Task) erro
|
|||
source.DocCount = index.Source.Docs
|
||||
|
||||
indexParameters := migration_model.IndexMigrationTaskConfig{
|
||||
Source: source,
|
||||
Target: target,
|
||||
Source: source,
|
||||
Target: target,
|
||||
Execution: clusterMigrationTask.Settings.Execution,
|
||||
Version: migration_model.IndexMigrationV1,
|
||||
}
|
||||
indexMigrationTask := task2.Task{
|
||||
ParentId: []string{taskItem.ID},
|
||||
|
@ -1315,6 +1363,8 @@ func (p *DispatcherProcessor) refreshInstanceJobsFromES() error {
|
|||
}
|
||||
|
||||
func (p *DispatcherProcessor) cleanGatewayQueue(taskItem *task2.Task) {
|
||||
log.Debugf("cleaning gateway queue for task [%s]", taskItem.ID)
|
||||
|
||||
var err error
|
||||
instance := model.Instance{}
|
||||
instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
|
||||
|
@ -1338,3 +1388,29 @@ func (p *DispatcherProcessor) cleanGatewayQueue(taskItem *task2.Task) {
|
|||
log.Errorf("failed to delete queue, err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *DispatcherProcessor) resetGatewayQueue(taskItem *task2.Task) error {
|
||||
log.Debugf("resetting gateway queue offset for task [%s]", taskItem.ID)
|
||||
|
||||
var err error
|
||||
instance := model.Instance{}
|
||||
instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
|
||||
_, err = orm.Get(&instance)
|
||||
if err != nil {
|
||||
log.Errorf("failed to get instance, err: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
selector := util.MapStr{
|
||||
"labels": util.MapStr{
|
||||
"migration_task_id": taskItem.ID,
|
||||
},
|
||||
}
|
||||
err = instance.DeleteQueueConsumersBySelector(selector)
|
||||
if err != nil {
|
||||
log.Errorf("failed to delete queue consumers, err: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -92,7 +92,7 @@ func (p *processor) handleReadyPipelineTask(taskItem *task.Task) error {
|
|||
taskItem.StartTimeInMillis = time.Now().UnixMilli()
|
||||
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||
Success: true,
|
||||
}, fmt.Sprintf("pipeline task [%s] started", taskItem.ID))
|
||||
}, fmt.Sprintf("[%v] pipeline task [%s] started", taskItem.Metadata.Labels["pipeline_id"], taskItem.ID))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -138,7 +138,7 @@ func (p *processor) handleRunningEsScrollPipelineTask(taskItem *task.Task) error
|
|||
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||
Success: errMsg == "",
|
||||
Error: errMsg,
|
||||
}, fmt.Sprintf("pipeline task [%s] completed", taskItem.ID))
|
||||
}, fmt.Sprintf("[es_scroll] pipeline task [%s] completed", taskItem.ID))
|
||||
p.cleanGatewayPipeline(taskItem)
|
||||
return nil
|
||||
}
|
||||
|
@ -171,7 +171,7 @@ func (p *processor) handleRunningBulkIndexingPipelineTask(taskItem *task.Task) e
|
|||
p.saveTaskAndWriteLog(taskItem, &task.TaskResult{
|
||||
Success: errMsg == "",
|
||||
Error: errMsg,
|
||||
}, fmt.Sprintf("pipeline task [%s] completed", taskItem.ID))
|
||||
}, fmt.Sprintf("[bulk_indexing] pipeline task [%s] completed", taskItem.ID))
|
||||
p.cleanGatewayPipeline(taskItem)
|
||||
return nil
|
||||
}
|
||||
|
@ -194,7 +194,7 @@ func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error {
|
|||
|
||||
if stopped {
|
||||
taskItem.Status = task.StatusStopped
|
||||
p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("task [%s] stopped", taskItem.ID))
|
||||
p.saveTaskAndWriteLog(taskItem, nil, fmt.Sprintf("[%v] task [%s] stopped", taskItem.Metadata.Labels["pipeline_id"], taskItem.ID))
|
||||
p.cleanGatewayPipeline(taskItem)
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue