[migration] get instance from scheduler, initialize instance automatically

This commit is contained in:
Kassian Sun 2023-05-30 11:29:09 +08:00
parent dd98ba7dff
commit 09ba88c411
7 changed files with 199 additions and 36 deletions

View File

@ -6,6 +6,7 @@ package model
import (
"context"
"errors"
"fmt"
"net/http"
"time"
@ -69,6 +70,25 @@ func (inst *Instance) DeletePipeline(pipelineID string) error {
return inst.doRequest(req, nil)
}
func (inst *Instance) GetPipeline(pipelineID string) (*pipeline.PipelineStatus, error) {
if pipelineID == "" {
return nil, errors.New("invalid pipelineID")
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
req := &util.Request{
Method: http.MethodGet,
Url: fmt.Sprintf("%s/pipeline/task/%s", inst.Endpoint, pipelineID),
Context: ctx,
}
res := pipeline.PipelineStatus{}
err := inst.doRequest(req, &res)
if err != nil {
return nil, err
}
return &res, nil
}
func (inst *Instance) GetPipelinesByIDs(pipelineIDs []string) (pipeline.GetPipelinesResponse, error) {
body := util.MustToJSONBytes(util.MapStr{
"ids": pipelineIDs,

View File

@ -5,7 +5,6 @@ import (
"time"
log "github.com/cihub/seelog"
"infini.sh/console/model"
migration_model "infini.sh/console/plugin/migration/model"
migration_util "infini.sh/console/plugin/migration/util"
@ -443,14 +442,12 @@ func (p *processor) getPipelineTasks(taskItem *task.Task, cfg *migration_model.I
func (p *processor) cleanGatewayQueue(taskItem *task.Task) {
log.Debugf("cleaning gateway queue for task [%s]", taskItem.ID)
var err error
instance := model.Instance{}
instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
if instance.ID == "" {
instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
if instanceID == "" {
log.Debugf("task [%s] not scheduled yet, skip cleaning queue", taskItem.ID)
return
}
_, err = orm.Get(&instance)
instance, err := p.scheduler.GetInstance(instanceID)
if err != nil {
log.Errorf("failed to get instance, err: %v", err)
return

View File

@ -7,7 +7,6 @@ import (
log "github.com/cihub/seelog"
"infini.sh/console/model"
migration_model "infini.sh/console/plugin/migration/model"
migration_util "infini.sh/console/plugin/migration/util"
@ -503,14 +502,12 @@ func (p *processor) getScrollBulkPipelineTasks(taskItem *task.Task) (scrollTask
func (p *processor) cleanGatewayQueue(taskItem *task.Task) {
log.Debugf("cleaning gateway queue for task [%s]", taskItem.ID)
var err error
instance := model.Instance{}
instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
if instance.ID == "" {
instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
if instanceID == "" {
log.Debugf("task [%s] not scheduled yet, skip cleaning queue", taskItem.ID)
return
}
_, err = orm.Get(&instance)
instance, err := p.scheduler.GetInstance(instanceID)
if err != nil {
log.Errorf("failed to get instance, err: %v", err)
return
@ -530,10 +527,8 @@ func (p *processor) cleanGatewayQueue(taskItem *task.Task) {
func (p *processor) resetGatewayQueue(taskItem *task.Task) error {
log.Debugf("resetting gateway queue offset for task [%s]", taskItem.ID)
var err error
instance := model.Instance{}
instance.ID, _ = util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
_, err = orm.Get(&instance)
instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
instance, err := p.scheduler.GetInstance(instanceID)
if err != nil {
log.Errorf("failed to get instance, err: %v", err)
return err

View File

@ -8,6 +8,7 @@ import (
type Scheduler interface {
GetPreferenceInstance(config ExecutionConfig) (instance *model.Instance, err error)
GetInstance(instanceID string) (instance *model.Instance, err error)
IncrInstanceJobs(instanceID string)
DecrInstanceJobs(instanceID string)
RefreshInstanceJobsFromES() error

View File

@ -96,7 +96,7 @@ func newMigrationDispatcherProcessor(c *config.Config) (pipeline.Processor, erro
if err != nil {
return nil, err
}
processor.pipelineTaskProcessor = pipeline_task.NewProcessor(cfg.Elasticsearch, cfg.IndexName, cfg.LogIndexName)
processor.pipelineTaskProcessor = pipeline_task.NewProcessor(cfg.Elasticsearch, cfg.IndexName, cfg.LogIndexName, processor.scheduler)
processor.indexMigrationTaskProcessor = index_migration.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler)
processor.clusterMigrationTaskProcessor = cluster_migration.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler)
processor.indexComparisonTaskProcessor = index_comparison.NewProcessor(cfg.Elasticsearch, cfg.IndexName, processor.scheduler)

View File

@ -14,7 +14,6 @@ import (
migration_util "infini.sh/console/plugin/migration/util"
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/task"
"infini.sh/framework/core/util"
)
@ -23,13 +22,16 @@ type processor struct {
Elasticsearch string
IndexName string
LogIndexName string
scheduler migration_model.Scheduler
}
func NewProcessor(elasticsearch, indexName, logIndexName string) migration_model.Processor {
func NewProcessor(elasticsearch, indexName, logIndexName string, scheduler migration_model.Scheduler) migration_model.Processor {
return &processor{
Elasticsearch: elasticsearch,
IndexName: indexName,
LogIndexName: logIndexName,
scheduler: scheduler,
}
}
@ -167,9 +169,10 @@ func (p *processor) handlePendingStopPipelineTask(taskItem *task.Task) error {
return nil
}
func (p *processor) cleanGatewayPipeline(taskItem *task.Task) (instance model.Instance, err error) {
func (p *processor) cleanGatewayPipeline(taskItem *task.Task) (instance *model.Instance, err error) {
instance, err = p.getPipelineExecutionInstance(taskItem)
if err != nil {
log.Errorf("failed to get execution instance for task [%s], err: %v", taskItem.ID, err)
return
}
err = instance.DeletePipeline(taskItem.ID)
@ -181,19 +184,13 @@ func (p *processor) cleanGatewayPipeline(taskItem *task.Task) (instance model.In
return instance, nil
}
func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (instance model.Instance, err error) {
instanceID := taskItem.Metadata.Labels["execution_instance_id"]
instance.ID, err = util.ExtractString(instanceID)
func (p *processor) getPipelineExecutionInstance(taskItem *task.Task) (*model.Instance, error) {
instanceID, _ := util.ExtractString(taskItem.Metadata.Labels["execution_instance_id"])
instance, err := p.scheduler.GetInstance(instanceID)
if err != nil {
log.Error("failed to get execution_instance_id")
return
return nil, err
}
_, err = orm.Get(&instance)
if err != nil {
log.Errorf("failed to get instance, err: %v", err)
return
}
return
return instance, nil
}
func (p *processor) getPipelineLogs(taskItem *task.Task, status []string, timestampGte int64) ([]util.MapStr, error) {

View File

@ -1,8 +1,10 @@
package scheduler
import (
"errors"
"fmt"
"math"
"strings"
"sync"
"time"
@ -13,10 +15,13 @@ import (
"infini.sh/framework/core/elastic"
"infini.sh/framework/core/orm"
"infini.sh/framework/core/pipeline"
"infini.sh/framework/core/task"
"infini.sh/framework/core/util"
)
const initializeInterval = 10 * time.Second
type scheduler struct {
Elasticsearch string
IndexName string
@ -29,8 +34,10 @@ type scheduler struct {
type DispatcherState struct {
Total int
LastInitializedAt time.Time
}
// NOTE: currently we assume task are scheduled sequentially, so GetInstance/GetPreferenceInstance doesn't need to handle locking
func NewScheduler(elasticsearch, indexName string, checkInstanceAvailable bool, maxTasksPerInstance int) (migration_model.Scheduler, error) {
scheduler := &scheduler{
Elasticsearch: elasticsearch,
@ -78,19 +85,147 @@ func (p *scheduler) GetPreferenceInstance(config migration_model.ExecutionConfig
return nil, fmt.Errorf("no available instance")
}
instance := model.Instance{}
instance.ID = minID
_, err = orm.Get(&instance)
instance, err := p.GetInstance(minID)
if err != nil {
return nil, err
}
if p.getInstanceState(minID).Total > p.MaxTasksPerInstance {
return nil, migration_model.ErrHitMax
}
return instance, nil
}
func (p *scheduler) GetInstance(instanceID string) (*model.Instance, error) {
if instanceID == "" {
return nil, errors.New("invalid instanceID")
}
instance := model.Instance{}
instance.ID = instanceID
_, err := orm.Get(&instance)
if err != nil {
log.Errorf("failed to get instance [%s] from orm, err: %v", instance.ID, err)
return nil, err
}
err = p.initializeInstance(&instance)
if err != nil {
log.Warnf("failed to initialized instance [%s], err: %v", instance.ID, err)
}
return &instance, nil
}
func (p *scheduler) initializeInstance(instance *model.Instance) error {
lastInitializedAt := p.getLastInitializedAt(instance.ID)
if time.Now().Sub(lastInitializedAt) < initializeInterval {
return nil
}
status, err := instance.GetPipeline("pipeline_logging_merge")
if err != nil {
if strings.Contains(err.Error(), "pipeline not found") {
log.Infof("pipeline_logging_merge not found on instance [%s], initializing", instance.ID)
err := p.createPipelineLoggingMerge(instance)
if err != nil {
return err
}
}
} else if status.State == pipeline.STOPPED {
log.Infof("pipeline_logging_merge stopped on instance [%s], starting", instance.ID)
err = instance.StartPipeline("pipeline_logging_merge")
if err != nil {
return err
}
}
status, err = instance.GetPipeline("ingest_pipeline_logging")
if err != nil {
if strings.Contains(err.Error(), "pipeline not found") {
log.Infof("ingest_pipeline_logging not found on instance [%s], initializing", instance.ID)
err := p.createIngestPipelineLogging(instance)
if err != nil {
return err
}
}
} else if status.State == pipeline.STOPPED {
log.Infof("ingest_pipeline_logging stopped on instance [%s], starting", instance.ID)
err = instance.StartPipeline("ingest_pipeline_logging")
if err != nil {
return err
}
}
p.setLastInitializedAt(instance.ID, time.Now())
return nil
}
// TODO: now we're using the same configuraiton as the default gateway.yml
// user could change the following configurations manually:
// - input_queue (metrics.logging_queue)
// - elasticsearch (elasticsearch.name)
func (p *scheduler) createPipelineLoggingMerge(instance *model.Instance) error {
cfg := &migration_model.PipelineTaskConfig{
Name: "pipeline_logging_merge",
AutoStart: true,
KeepRunning: true,
Processor: []util.MapStr{
util.MapStr{
"indexing_merge": util.MapStr{
"input_queue": "logging",
"idle_timeout_in_seconds": 1,
"elasticsearch": "logging-server",
"index_name": ".infini_logs",
"output_queue": util.MapStr{
"name": "gateway-pipeline-logs",
"label": util.MapStr{
"tag": "pipeline_logging",
},
},
"worker_size": 1,
"bulk_size_in_kb": 1,
},
},
},
}
err := instance.CreatePipeline(util.MustToJSONBytes(cfg))
if err != nil {
log.Errorf("create pipeline_logging_merge [%s] failed, err: %+v", instance.ID, err)
return err
}
return nil
}
func (p *scheduler) createIngestPipelineLogging(instance *model.Instance) error {
cfg := &migration_model.PipelineTaskConfig{
Name: "ingest_pipeline_logging",
AutoStart: true,
KeepRunning: true,
Processor: []util.MapStr{
util.MapStr{
"bulk_indexing": util.MapStr{
"bulk": util.MapStr{
"compress": true,
"batch_size_in_mb": 1,
"batch_size_in_docs": 1,
},
"consumer": util.MapStr{
"fetch_max_messages": 100,
},
"queues": util.MapStr{
"type": "indexing_merge",
"tag": "pipeline_logging",
},
},
},
},
}
err := instance.CreatePipeline(util.MustToJSONBytes(cfg))
if err != nil {
log.Errorf("create ingest_pipeline_logging [%s] failed, err: %+v", instance.ID, err)
return err
}
return nil
}
func (p *scheduler) RefreshInstanceJobsFromES() error {
log.Debug("refreshing instance state from ES")
p.stateLock.Lock()
@ -123,6 +258,24 @@ func (p *scheduler) IncrInstanceJobs(instanceID string) {
p.state[instanceID] = instanceState
}
func (p *scheduler) getLastInitializedAt(instanceID string) time.Time {
p.stateLock.Lock()
defer p.stateLock.Unlock()
if st, ok := p.state[instanceID]; ok {
return st.LastInitializedAt
}
return time.Time{}
}
func (p *scheduler) setLastInitializedAt(instanceID string, t time.Time) {
p.stateLock.Lock()
defer p.stateLock.Unlock()
if st, ok := p.state[instanceID]; ok {
st.LastInitializedAt = t
p.state[instanceID] = st
}
}
func (p *scheduler) getInstanceState(instanceID string) DispatcherState {
p.stateLock.Lock()
defer p.stateLock.Unlock()