From 09ba88c411d050f3c579e679986f4bd2e3270951 Mon Sep 17 00:00:00 2001 From: Kassian Sun Date: Tue, 30 May 2023 11:29:09 +0800 Subject: [PATCH] [migration] get instance from scheduler, initialize instance automatically --- model/instance.go | 20 +++ .../index_comparison/index_comparison.go | 9 +- .../index_migration/index_migration.go | 15 +- plugin/migration/model/scheduler.go | 1 + plugin/migration/pipeline.go | 2 +- .../migration/pipeline_task/pipeline_task.go | 25 ++- plugin/migration/scheduler/scheduler.go | 163 +++++++++++++++++- 7 files changed, 199 insertions(+), 36 deletions(-) diff --git a/model/instance.go b/model/instance.go index 2144545b..81b28946 100644 --- a/model/instance.go +++ b/model/instance.go @@ -6,6 +6,7 @@ package model import ( "context" + "errors" "fmt" "net/http" "time" @@ -69,6 +70,25 @@ func (inst *Instance) DeletePipeline(pipelineID string) error { return inst.doRequest(req, nil) } +func (inst *Instance) GetPipeline(pipelineID string) (*pipeline.PipelineStatus, error) { + if pipelineID == "" { + return nil, errors.New("invalid pipelineID") + } + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + req := &util.Request{ + Method: http.MethodGet, + Url: fmt.Sprintf("%s/pipeline/task/%s", inst.Endpoint, pipelineID), + Context: ctx, + } + res := pipeline.PipelineStatus{} + err := inst.doRequest(req, &res) + if err != nil { + return nil, err + } + return &res, nil +} + func (inst *Instance) GetPipelinesByIDs(pipelineIDs []string) (pipeline.GetPipelinesResponse, error) { body := util.MustToJSONBytes(util.MapStr{ "ids": pipelineIDs, diff --git a/plugin/migration/index_comparison/index_comparison.go b/plugin/migration/index_comparison/index_comparison.go index f31d6ffe..8e598fe3 100644 --- a/plugin/migration/index_comparison/index_comparison.go +++ b/plugin/migration/index_comparison/index_comparison.go @@ -5,7 +5,6 @@ import ( "time" log "github.com/cihub/seelog" - "infini.sh/console/model" migration_model "infini.sh/console/plugin/migration/model" migration_util "infini.sh/console/plugin/migration/util" @@ -443,14 +442,12 @@ func (p *processor) getPipelineTasks(taskItem *task.Task, cfg *migration_model.I func (p *processor) cleanGatewayQueue(taskItem *task.Task) { log.Debugf("cleaning gateway queue for task [%s]", taskItem.ID) - var err error - instance := model.Instance{} - instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) - if instance.ID == "" { + instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) + if instanceID == "" { log.Debugf("task [%s] not scheduled yet, skip cleaning queue", taskItem.ID) return } - _, err = orm.Get(&instance) + instance, err := p.scheduler.GetInstance(instanceID) if err != nil { log.Errorf("failed to get instance, err: %v", err) return diff --git a/plugin/migration/index_migration/index_migration.go b/plugin/migration/index_migration/index_migration.go index 46ad2f23..b82062c5 100644 --- a/plugin/migration/index_migration/index_migration.go +++ b/plugin/migration/index_migration/index_migration.go @@ -7,7 +7,6 @@ import ( log "github.com/cihub/seelog" - "infini.sh/console/model" migration_model "infini.sh/console/plugin/migration/model" migration_util "infini.sh/console/plugin/migration/util" @@ -503,14 +502,12 @@ func (p *processor) getScrollBulkPipelineTasks(taskItem *task.Task) (scrollTask func (p *processor) cleanGatewayQueue(taskItem *task.Task) { log.Debugf("cleaning gateway queue for task [%s]", taskItem.ID) - var err error - instance := model.Instance{} - instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) - if instance.ID == "" { + instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) + if instanceID == "" { log.Debugf("task [%s] not scheduled yet, skip cleaning queue", taskItem.ID) return } - _, err = orm.Get(&instance) + instance, err := p.scheduler.GetInstance(instanceID) if err != nil { log.Errorf("failed to get instance, err: %v", err) return @@ -530,10 +527,8 @@ func (p *processor) cleanGatewayQueue(taskItem *task.Task) { func (p *processor) resetGatewayQueue(taskItem *task.Task) error { log.Debugf("resetting gateway queue offset for task [%s]", taskItem.ID) - var err error - instance := model.Instance{} - instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) - _, err = orm.Get(&instance) + instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) + instance, err := p.scheduler.GetInstance(instanceID) if err != nil { log.Errorf("failed to get instance, err: %v", err) return err diff --git a/plugin/migration/model/scheduler.go b/plugin/migration/model/scheduler.go index f7f62979..d1526f5f 100644 --- a/plugin/migration/model/scheduler.go +++ b/plugin/migration/model/scheduler.go @@ -8,6 +8,7 @@ import ( type Scheduler interface { GetPreferenceInstance(config ExecutionConfig) (instance *model.Instance, err error) + GetInstance(instanceID string) (instance *model.Instance, err error) IncrInstanceJobs(instanceID string) DecrInstanceJobs(instanceID string) RefreshInstanceJobsFromES() error diff --git a/plugin/migration/pipeline.go b/plugin/migration/pipeline.go index 392f8fe9..f5185276 100644 --- a/plugin/migration/pipeline.go +++ b/plugin/migration/pipeline.go @@ -96,7 +96,7 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro if err != nil { return nil, err } - processor.pipelineTaskProcessor = pipeline_task.NewProcessor(cfg.Elasticsearch, cfg.IndexName, cfg.LogIndexName) + processor.pipelineTaskProcessor = pipeline_task.NewProcessor(cfg.Elasticsearch, cfg.IndexName, cfg.LogIndexName, processor.scheduler) processor.indexMigrationTaskProcessor = index_migration.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler) processor.clusterMigrationTaskProcessor = cluster_migration.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler) processor.indexComparisonTaskProcessor = index_comparison.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler) diff --git a/plugin/migration/pipeline_task/pipeline_task.go b/plugin/migration/pipeline_task/pipeline_task.go index 70ff0484..9106cc6a 100644 --- a/plugin/migration/pipeline_task/pipeline_task.go +++ b/plugin/migration/pipeline_task/pipeline_task.go @@ -14,7 +14,6 @@ import ( migration_util "infini.sh/console/plugin/migration/util" "infini.sh/framework/core/elastic" - "infini.sh/framework/core/orm" "infini.sh/framework/core/task" "infini.sh/framework/core/util" ) @@ -23,13 +22,16 @@ type processor struct { Elasticsearch string IndexName string LogIndexName string + + scheduler migration_model.Scheduler } -func NewProcessor(elasticsearch, indexName, logIndexName string) migration_model.Processor { +func NewProcessor(elasticsearch, indexName, logIndexName string, scheduler migration_model.Scheduler) migration_model.Processor { return &processor{ Elasticsearch: elasticsearch, IndexName: indexName, LogIndexName: logIndexName, + scheduler: scheduler, } } @@ -167,9 +169,10 @@ func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error { return nil } -func (p *processor) cleanGatewayPipeline(taskItem *task.Task) (instance model.Instance, err error) { +func (p *processor) cleanGatewayPipeline(taskItem *task.Task) (instance *model.Instance, err error) { instance, err = p.getPipelineExecutionInstance(taskItem) if err != nil { + log.Errorf("failed to get execution instance for task [%s], err: %v", taskItem.ID, err) return } err = instance.DeletePipeline(taskItem.ID) @@ -181,19 +184,13 @@ func (p *processor) cleanGatewayPipeline(taskItem *task.Task) (instance model.In return instance, nil } -func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (instance model.Instance, err error) { - instanceID := taskItem.Metadata.Labels["execution_instance_id"] - instance.ID, err = util.ExtractString(instanceID) +func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (*model.Instance, error) { + instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"]) + instance, err := p.scheduler.GetInstance(instanceID) if err != nil { - log.Error("failed to get execution_instance_id") - return + return nil, err } - _, err = orm.Get(&instance) - if err != nil { - log.Errorf("failed to get instance, err: %v", err) - return - } - return + return instance, nil } func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timestampGte int64) ([]util.MapStr, error) { diff --git a/plugin/migration/scheduler/scheduler.go b/plugin/migration/scheduler/scheduler.go index 1b645c61..23b6e675 100644 --- a/plugin/migration/scheduler/scheduler.go +++ b/plugin/migration/scheduler/scheduler.go @@ -1,8 +1,10 @@ package scheduler import ( + "errors" "fmt" "math" + "strings" "sync" "time" @@ -13,10 +15,13 @@ import ( "infini.sh/framework/core/elastic" "infini.sh/framework/core/orm" + "infini.sh/framework/core/pipeline" "infini.sh/framework/core/task" "infini.sh/framework/core/util" ) +const initializeInterval = 10 * time.Second + type scheduler struct { Elasticsearch string IndexName string @@ -28,9 +33,11 @@ type scheduler struct { } type DispatcherState struct { - Total int + Total int + LastInitializedAt time.Time } +// NOTE: currently we assume task are scheduled sequentially, so GetInstance/GetPreferenceInstance doesn't need to handle locking func NewScheduler(elasticsearch, indexName string, checkInstanceAvailable bool, maxTasksPerInstance int) (migration_model.Scheduler, error) { scheduler := &scheduler{ Elasticsearch: elasticsearch, @@ -78,19 +85,147 @@ func (p *scheduler) GetPreferenceInstance(config migration_model.ExecutionConfig return nil, fmt.Errorf("no available instance") } - instance := model.Instance{} - instance.ID = minID - - _, err = orm.Get(&instance) + instance, err := p.GetInstance(minID) if err != nil { return nil, err } if p.getInstanceState(minID).Total > p.MaxTasksPerInstance { return nil, migration_model.ErrHitMax } + return instance, nil +} + +func (p *scheduler) GetInstance(instanceID string) (*model.Instance, error) { + if instanceID == "" { + return nil, errors.New("invalid instanceID") + } + instance := model.Instance{} + instance.ID = instanceID + + _, err := orm.Get(&instance) + if err != nil { + log.Errorf("failed to get instance [%s] from orm, err: %v", instance.ID, err) + return nil, err + } + err = p.initializeInstance(&instance) + if err != nil { + log.Warnf("failed to initialized instance [%s], err: %v", instance.ID, err) + } return &instance, nil } +func (p *scheduler) initializeInstance(instance *model.Instance) error { + lastInitializedAt := p.getLastInitializedAt(instance.ID) + if time.Now().Sub(lastInitializedAt) < initializeInterval { + return nil + } + + status, err := instance.GetPipeline("pipeline_logging_merge") + if err != nil { + if strings.Contains(err.Error(), "pipeline not found") { + log.Infof("pipeline_logging_merge not found on instance [%s], initializing", instance.ID) + err := p.createPipelineLoggingMerge(instance) + if err != nil { + return err + } + } + } else if status.State == pipeline.STOPPED { + log.Infof("pipeline_logging_merge stopped on instance [%s], starting", instance.ID) + err = instance.StartPipeline("pipeline_logging_merge") + if err != nil { + return err + } + } + + status, err = instance.GetPipeline("ingest_pipeline_logging") + if err != nil { + if strings.Contains(err.Error(), "pipeline not found") { + log.Infof("ingest_pipeline_logging not found on instance [%s], initializing", instance.ID) + err := p.createIngestPipelineLogging(instance) + if err != nil { + return err + } + } + } else if status.State == pipeline.STOPPED { + log.Infof("ingest_pipeline_logging stopped on instance [%s], starting", instance.ID) + err = instance.StartPipeline("ingest_pipeline_logging") + if err != nil { + return err + } + } + + p.setLastInitializedAt(instance.ID, time.Now()) + return nil +} + +// TODO: now we're using the same configuraiton as the default gateway.yml +// user could change the following configurations manually: +// - input_queue (metrics.logging_queue) +// - elasticsearch (elasticsearch.name) +func (p *scheduler) createPipelineLoggingMerge(instance *model.Instance) error { + cfg := &migration_model.PipelineTaskConfig{ + Name: "pipeline_logging_merge", + AutoStart: true, + KeepRunning: true, + Processor: []util.MapStr{ + util.MapStr{ + "indexing_merge": util.MapStr{ + "input_queue": "logging", + "idle_timeout_in_seconds": 1, + "elasticsearch": "logging-server", + "index_name": ".infini_logs", + "output_queue": util.MapStr{ + "name": "gateway-pipeline-logs", + "label": util.MapStr{ + "tag": "pipeline_logging", + }, + }, + "worker_size": 1, + "bulk_size_in_kb": 1, + }, + }, + }, + } + err := instance.CreatePipeline(util.MustToJSONBytes(cfg)) + if err != nil { + log.Errorf("create pipeline_logging_merge [%s] failed, err: %+v", instance.ID, err) + return err + } + return nil +} + +func (p *scheduler) createIngestPipelineLogging(instance *model.Instance) error { + cfg := &migration_model.PipelineTaskConfig{ + Name: "ingest_pipeline_logging", + AutoStart: true, + KeepRunning: true, + Processor: []util.MapStr{ + util.MapStr{ + "bulk_indexing": util.MapStr{ + "bulk": util.MapStr{ + "compress": true, + "batch_size_in_mb": 1, + "batch_size_in_docs": 1, + }, + "consumer": util.MapStr{ + "fetch_max_messages": 100, + }, + "queues": util.MapStr{ + "type": "indexing_merge", + "tag": "pipeline_logging", + }, + }, + }, + }, + } + err := instance.CreatePipeline(util.MustToJSONBytes(cfg)) + if err != nil { + log.Errorf("create ingest_pipeline_logging [%s] failed, err: %+v", instance.ID, err) + return err + } + return nil +} + func (p *scheduler) RefreshInstanceJobsFromES() error { log.Debug("refreshing instance state from ES") p.stateLock.Lock() @@ -123,6 +258,24 @@ func (p *scheduler) IncrInstanceJobs(instanceID string) { p.state[instanceID] = instanceState } +func (p *scheduler) getLastInitializedAt(instanceID string) time.Time { + p.stateLock.Lock() + defer p.stateLock.Unlock() + if st, ok := p.state[instanceID]; ok { + return st.LastInitializedAt + } + return time.Time{} +} + +func (p *scheduler) setLastInitializedAt(instanceID string, t time.Time) { + p.stateLock.Lock() + defer p.stateLock.Unlock() + if st, ok := p.state[instanceID]; ok { + st.LastInitializedAt = t + p.state[instanceID] = st + } +} + func (p *scheduler) getInstanceState(instanceID string) DispatcherState { p.stateLock.Lock() defer p.stateLock.Unlock()