diff --git a/config/system_config.tpl b/config/system_config.tpl index fb9333dc..f0b67de3 100644 --- a/config/system_config.tpl +++ b/config/system_config.tpl @@ -51,11 +51,7 @@ pipeline: auto_start: true keep_running: true processor: - - metadata: - bulk_size_in_mb: 5 - bulk_max_docs_count: 5000 - fetch_max_messages: 100 - elasticsearch: "$[[CLUSTER_ID]]" + - consumer: queues: type: metadata category: elasticsearch @@ -63,15 +59,15 @@ pipeline: group: metadata when: cluster_available: ["$[[CLUSTER_ID]]"] + processor: + - metadata: + elasticsearch: "$[[CLUSTER_ID]]" + - name: activity_ingest auto_start: true keep_running: true processor: - - activity: - bulk_size_in_mb: 5 - bulk_max_docs_count: 5000 - fetch_max_messages: 100 - elasticsearch: "$[[CLUSTER_ID]]" + - consumer: queues: category: elasticsearch activity: true @@ -79,6 +75,9 @@ pipeline: group: activity when: cluster_available: ["$[[CLUSTER_ID]]"] + processor: + - activity: + elasticsearch: "$[[CLUSTER_ID]]" - name: migration_task_dispatcher auto_start: true keep_running: true diff --git a/plugin/elastic/activity.go b/plugin/elastic/activity.go index ba27abf4..8ae0f6d0 100644 --- a/plugin/elastic/activity.go +++ b/plugin/elastic/activity.go @@ -11,26 +11,17 @@ import ( "github.com/segmentio/encoding/json" "infini.sh/framework/core/config" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/errors" "infini.sh/framework/core/event" - "infini.sh/framework/core/global" "infini.sh/framework/core/orm" "infini.sh/framework/core/pipeline" "infini.sh/framework/core/queue" - "infini.sh/framework/core/rotate" "infini.sh/framework/core/util" - "runtime" - - "sync" - "time" ) type ActivityProcessor struct { - config *Config - runningConfigs map[string]*queue.QueueConfig - wg sync.WaitGroup - inFlightQueueConfigs sync.Map - detectorRunning bool - id string + config *Config + id string } func init() { @@ -39,44 +30,21 @@ func init() { func NewActivityProcessor(c *config.Config) (pipeline.Processor, error) { cfg := Config{ - NumOfWorkers: 1, - MaxWorkers: 10, - MaxConnectionPerHost: 1, - IdleTimeoutInSecond: 5, - DetectIntervalInMs: 10000, - Queues: map[string]interface{}{}, - - Consumer: queue.ConsumerConfig{ - Group: "activity-001", - Name: "activity-001", - FetchMinBytes: 1, - FetchMaxBytes: 10 * 1024 * 1024, - FetchMaxMessages: 500, - EOFRetryDelayInMs: 1000, - FetchMaxWaitMs: 10000, - }, - - DetectActiveQueue: true, - ValidateRequest: false, - SkipEmptyQueue: true, - SkipOnMissingInfo: false, - RotateConfig: rotate.DefaultConfig, - BulkConfig: elastic.DefaultBulkProcessorConfig, + MessageField: "messages", } if err := c.Unpack(&cfg); err != nil { - log.Error(err) return nil, fmt.Errorf("failed to unpack the configuration of flow_runner processor: %s", err) } - runner := ActivityProcessor{ - id: util.GetUUID(), - config: &cfg, - runningConfigs: map[string]*queue.QueueConfig{}, - inFlightQueueConfigs: sync.Map{}, + if cfg.Elasticsearch == "" { + return nil, errors.New("elasticsearch config was not found in metadata processor") } - runner.wg = sync.WaitGroup{} + runner := ActivityProcessor{ + id: util.GetUUID(), + config: &cfg, + } return &runner, nil } @@ -85,238 +53,32 @@ func (processor *ActivityProcessor) Name() string { return "activity" } -func (processor *ActivityProcessor) Process(c *pipeline.Context) error { - defer func() { - if !global.Env().IsDebug { - if r := recover(); r != nil { - var v string - switch r.(type) { - case error: - v = r.(error).Error() - case runtime.Error: - v = r.(runtime.Error).Error() - case string: - v = r.(string) - } - log.Error("error in activity processor,", v) +func (processor *ActivityProcessor) Process(ctx *pipeline.Context) error { + //get message from queue + obj := ctx.Get(processor.config.MessageField) + if obj != nil { + messages := obj.([]queue.Message) + log.Tracef("get %v messages from context", len(messages)) + if len(messages) == 0 { + return nil + } + for _, pop := range messages { + typ, err := jsonparser.GetString(pop.Data, "metadata", "name") + if err != nil { + panic(err) } - } - log.Trace("exit activity processor") - }() - - //handle updates - if processor.config.DetectActiveQueue { - log.Tracef("detector running [%v]", processor.detectorRunning) - if !processor.detectorRunning { - processor.detectorRunning = true - processor.wg.Add(1) - go func(c *pipeline.Context) { - log.Tracef("init detector for active queue [%v] ", processor.id) - defer func() { - if !global.Env().IsDebug { - if r := recover(); r != nil { - var v string - switch r.(type) { - case error: - v = r.(error).Error() - case runtime.Error: - v = r.(runtime.Error).Error() - case string: - v = r.(string) - } - log.Error("error in activity processor,", v) - } - } - processor.detectorRunning = false - log.Debug("exit detector for active queue") - processor.wg.Done() - }() - - for { - if c.IsCanceled() { - return - } - - if global.Env().IsDebug { - log.Tracef("inflight queues: %v", util.MapLength(&processor.inFlightQueueConfigs)) - processor.inFlightQueueConfigs.Range(func(key, value interface{}) bool { - log.Tracef("inflight queue:%v", key) - return true - }) - } - - cfgs := queue.GetConfigByLabels(processor.config.Queues) - for _, v := range cfgs { - if c.IsCanceled() { - return - } - //if have depth and not in in flight - if queue.HasLag(v) { - _, ok := processor.inFlightQueueConfigs.Load(v.Id) - if !ok { - log.Tracef("detecting new queue: %v", v.Name) - processor.HandleQueueConfig(v, c) - } - } - } - if processor.config.DetectIntervalInMs > 0 { - time.Sleep(time.Millisecond * time.Duration(processor.config.DetectIntervalInMs)) - } - } - }(c) - } - } else { - cfgs := queue.GetConfigByLabels(processor.config.Queues) - log.Debugf("filter queue by:%v, num of queues:%v", processor.config.Queues, len(cfgs)) - for _, v := range cfgs { - log.Tracef("checking queue: %v", v) - processor.HandleQueueConfig(v, c) - } - } - - processor.wg.Wait() - - return nil -} - -func (processor *ActivityProcessor) HandleQueueConfig(v *queue.QueueConfig, c *pipeline.Context) { - - if processor.config.SkipEmptyQueue { - if !queue.HasLag(v) { - if global.Env().IsDebug { - log.Tracef("skip empty queue:[%v]", v.Name) - } - return - } - } - - elasticsearch := processor.config.Elasticsearch - if elasticsearch == "" { - log.Error("elasticsearch config was not found in activity processor") - return - } - - meta := elastic.GetMetadata(util.ToString(elasticsearch)) - if meta == nil { - log.Debugf("metadata for [%v] is nil", elasticsearch) - return - } - - host := meta.GetActiveHost() - log.Debugf("random choose node [%v] to consume queue [%v]", host, v.Id) - processor.wg.Add(1) - - //go processor.NewBulkWorker("bulk_indexing_"+host,c, processor.bulkSizeInByte, v, host) - go processor.HandleMessage(c, v) - -} - -func (processor *ActivityProcessor) HandleMessage(ctx *pipeline.Context, qConfig *queue.QueueConfig) { - defer func() { - if !global.Env().IsDebug { - if r := recover(); r != nil { - var v string - switch r.(type) { - case error: - v = r.(error).Error() - case runtime.Error: - v = r.(runtime.Error).Error() - case string: - v = r.(string) - } - log.Errorf("error in %s processor: %v", processor.Name(), v) - } - } - processor.wg.Done() - log.Tracef("exit %s processor", processor.Name()) - }() - - key := qConfig.Id - - if processor.config.MaxWorkers > 0 && util.MapLength(&processor.inFlightQueueConfigs) > processor.config.MaxWorkers { - log.Debugf("reached max num of workers, skip init [%v]", qConfig.Name) - return - } - - var workerID = util.GetUUID() - _, exists := processor.inFlightQueueConfigs.Load(key) - if exists { - log.Errorf("queue [%v] has more then one consumer", qConfig.Id) - return - } - - processor.inFlightQueueConfigs.Store(key, workerID) - log.Debugf("starting worker:[%v], queue:[%v]", workerID, qConfig.Name) - var consumer = queue.GetOrInitConsumerConfig(qConfig.Id, processor.config.Consumer.Group, processor.config.Consumer.Name) - initOffset, _ := queue.GetOffset(qConfig, consumer) - offset := initOffset - defer func() { - log.Debugf("worker:[%v] start consume queue:[%v] offset:%v", workerID, qConfig.Id, offset) - }() - - for { - if ctx.IsCanceled() { - return - } - - ctx1, messages, timeout, err := queue.Consume(qConfig, consumer, offset) - - if len(messages)==0{ - time.Sleep(time.Millisecond * time.Duration(500)) - } - - if timeout { - log.Tracef("timeout on queue:[%v]", qConfig.Name) - ctx.Failed(fmt.Errorf("timeout on queue:[%v]", qConfig.Name)) - return - } - - if err != nil { - log.Tracef("error on queue:[%v]", qConfig.Name) - if err.Error() == "EOF" { - if len(messages) > 0 { - goto HANDLE_MESSAGE - } - return - } - panic(err) - } - - HANDLE_MESSAGE: - - //update temp offset, not committed, continued reading - offset = ctx1.NextOffset.String()//TODO - - if len(messages) > 0 { - for _, pop := range messages { - typ, err := jsonparser.GetString(pop.Data, "metadata", "name") + switch typ { + case "activity": + activity, _, _, err := jsonparser.Get(pop.Data, "payload", "activity") if err != nil { panic(err) } - switch typ { - case "activity": - activity, _, _, err := jsonparser.Get(pop.Data, "payload", "activity") - if err != nil { - panic(err) - } - - err = processor.HandleActivity(activity) - } + err = processor.HandleActivity(activity) } } - if err == nil { - if offset != "" && initOffset != offset { - ok, err := queue.CommitOffset(qConfig, consumer, offset) - if !ok || err != nil { - panic(err) - } - initOffset=offset - } - } else { - log.Error(err) - } } + return nil } func (processor *ActivityProcessor) HandleActivity(activityByte []byte) error { diff --git a/plugin/elastic/metadata.go b/plugin/elastic/metadata.go index 4063e93d..68915d6f 100644 --- a/plugin/elastic/metadata.go +++ b/plugin/elastic/metadata.go @@ -10,53 +10,23 @@ import ( log "github.com/cihub/seelog" "infini.sh/framework/core/config" "infini.sh/framework/core/elastic" + "infini.sh/framework/core/errors" "infini.sh/framework/core/event" - "infini.sh/framework/core/global" "infini.sh/framework/core/orm" + "infini.sh/framework/core/param" "infini.sh/framework/core/pipeline" "infini.sh/framework/core/queue" - "infini.sh/framework/core/rotate" "infini.sh/framework/core/util" - "runtime" - - "sync" - "time" ) type MetadataProcessor struct { - config *Config - runningConfigs map[string]*queue.QueueConfig - wg sync.WaitGroup - inFlightQueueConfigs sync.Map - detectorRunning bool - id string + config *Config + id string } type Config struct { - NumOfWorkers int `config:"worker_size"` - - IdleTimeoutInSecond int `config:"idle_timeout_in_seconds"` - MaxConnectionPerHost int `config:"max_connection_per_node"` - - Queues map[string]interface{} `config:"queues,omitempty"` - - Consumer queue.ConsumerConfig `config:"consumer"` - - MaxWorkers int `config:"max_worker_size"` - - DetectActiveQueue bool `config:"detect_active_queue"` - DetectIntervalInMs int `config:"detect_interval"` - - ValidateRequest bool `config:"valid_request"` - SkipEmptyQueue bool `config:"skip_empty_queue"` - SkipOnMissingInfo bool `config:"skip_info_missing"` - - RotateConfig rotate.RotateConfig `config:"rotate"` - BulkConfig elastic.BulkProcessorConfig `config:"bulk"` - - Elasticsearch string `config:"elasticsearch,omitempty"` - - WaitingAfter []string `config:"waiting_after"` + MessageField param.ParaKey `config:"message_field"` + Elasticsearch string `config:"elasticsearch,omitempty"` } func init() { @@ -65,45 +35,21 @@ func init() { func New(c *config.Config) (pipeline.Processor, error) { cfg := Config{ - NumOfWorkers: 1, - MaxWorkers: 10, - MaxConnectionPerHost: 1, - IdleTimeoutInSecond: 5, - DetectIntervalInMs: 10000, - Queues: map[string]interface{}{}, - - Consumer: queue.ConsumerConfig{ - Group: "metadata-001", - Name: "metadata-001", - FetchMinBytes: 1, - FetchMaxBytes: 10 * 1024 * 1024, - FetchMaxMessages: 500, - EOFRetryDelayInMs: 1000, - FetchMaxWaitMs: 10000, - }, - - DetectActiveQueue: true, - ValidateRequest: false, - SkipEmptyQueue: true, - SkipOnMissingInfo: false, - RotateConfig: rotate.DefaultConfig, - BulkConfig: elastic.DefaultBulkProcessorConfig, + MessageField: "messages", } if err := c.Unpack(&cfg); err != nil { - log.Error(err) return nil, fmt.Errorf("failed to unpack the configuration of flow_runner processor: %s", err) } - runner := MetadataProcessor{ - id: util.GetUUID(), - config: &cfg, - runningConfigs: map[string]*queue.QueueConfig{}, - inFlightQueueConfigs: sync.Map{}, + if cfg.Elasticsearch == "" { + return nil, errors.New("elasticsearch config was not found in metadata processor") } - runner.wg = sync.WaitGroup{} - + runner := MetadataProcessor{ + id: util.GetUUID(), + config: &cfg, + } return &runner, nil } @@ -111,247 +57,38 @@ func (processor *MetadataProcessor) Name() string { return "metadata" } -func (processor *MetadataProcessor) Process(c *pipeline.Context) error { - defer func() { - if !global.Env().IsDebug { - if r := recover(); r != nil { - var v string - switch r.(type) { - case error: - v = r.(error).Error() - case runtime.Error: - v = r.(runtime.Error).Error() - case string: - v = r.(string) - } - log.Error("error in metadata processor,", v) +func (processor *MetadataProcessor) Process(ctx *pipeline.Context) error { + + //get message from queue + obj := ctx.Get(processor.config.MessageField) + if obj != nil { + messages := obj.([]queue.Message) + log.Tracef("get %v messages from context", len(messages)) + if len(messages) == 0 { + return nil + } + for _, pop := range messages { + typ, err := jsonparser.GetString(pop.Data, "metadata", "name") + if err != nil { + panic(err) } - } - log.Trace("exit metadata processor") - }() - - //handle updates - if processor.config.DetectActiveQueue { - log.Tracef("detector running [%v]", processor.detectorRunning) - if !processor.detectorRunning { - processor.detectorRunning = true - processor.wg.Add(1) - go func(c *pipeline.Context) { - log.Tracef("init detector for active queue [%v] ", processor.id) - defer func() { - if !global.Env().IsDebug { - if r := recover(); r != nil { - var v string - switch r.(type) { - case error: - v = r.(error).Error() - case runtime.Error: - v = r.(runtime.Error).Error() - case string: - v = r.(string) - } - log.Error("error in metadata processor,", v) - } - } - processor.detectorRunning = false - log.Debug("exit detector for active queue") - processor.wg.Done() - }() - - for { - if c.IsCanceled() { - return - } - - if global.Env().IsDebug { - log.Tracef("inflight queues: %v", util.MapLength(&processor.inFlightQueueConfigs)) - processor.inFlightQueueConfigs.Range(func(key, value interface{}) bool { - log.Tracef("inflight queue:%v", key) - return true - }) - } - - cfgs := queue.GetConfigByLabels(processor.config.Queues) - for _, v := range cfgs { - if c.IsCanceled() { - return - } - //if have depth and not in in flight - if queue.HasLag(v) { - _, ok := processor.inFlightQueueConfigs.Load(v.Id) - if !ok { - log.Tracef("detecting new queue: %v", v.Name) - processor.HandleQueueConfig(v, c) - } - } - } - if processor.config.DetectIntervalInMs > 0 { - time.Sleep(time.Millisecond * time.Duration(processor.config.DetectIntervalInMs)) - } - } - }(c) - } - } else { - cfgs := queue.GetConfigByLabels(processor.config.Queues) - log.Debugf("filter queue by:%v, num of queues:%v", processor.config.Queues, len(cfgs)) - for _, v := range cfgs { - log.Tracef("checking queue: %v", v) - processor.HandleQueueConfig(v, c) - } - } - - processor.wg.Wait() - - return nil -} - -func (processor *MetadataProcessor) HandleQueueConfig(v *queue.QueueConfig, c *pipeline.Context) { - - if processor.config.SkipEmptyQueue { - if !queue.HasLag(v) { - if global.Env().IsDebug { - log.Tracef("skip empty queue:[%v]", v.Name) - } - return - } - } - - elasticsearch := processor.config.Elasticsearch - if elasticsearch == "" { - log.Error("elasticsearch config was not found in metadata processor") - return - } - - meta := elastic.GetMetadata(util.ToString(elasticsearch)) - if meta == nil { - log.Debugf("metadata for [%v] is nil", elasticsearch) - return - } - - host := meta.GetActiveHost() - log.Debugf("random choose node [%v] to consume queue [%v]", host, v.Id) - processor.wg.Add(1) - - //go processor.NewBulkWorker("bulk_indexing_"+host,c, processor.bulkSizeInByte, v, host) - go processor.HandleMessage(c, v) - -} - -func (processor *MetadataProcessor) HandleMessage(ctx *pipeline.Context, qConfig *queue.QueueConfig) { - defer func() { - if !global.Env().IsDebug { - if r := recover(); r != nil { - var v string - switch r.(type) { - case error: - v = r.(error).Error() - case runtime.Error: - v = r.(runtime.Error).Error() - case string: - v = r.(string) - } - log.Errorf("error in %s processor: %v", processor.Name(), v) - } - } - processor.wg.Done() - log.Tracef("exit %s processor", processor.Name()) - }() - - key := qConfig.Id - - if processor.config.MaxWorkers > 0 && util.MapLength(&processor.inFlightQueueConfigs) > processor.config.MaxWorkers { - log.Debugf("reached max num of workers, skip init [%v]", qConfig.Name) - return - } - - var workerID = util.GetUUID() - _, exists := processor.inFlightQueueConfigs.Load(key) - if exists { - log.Errorf("queue [%v] has more then one consumer", qConfig.Id) - return - } - - processor.inFlightQueueConfigs.Store(key, workerID) - log.Debugf("starting worker:[%v], queue:[%v]", workerID, qConfig.Name) - var consumer = queue.GetOrInitConsumerConfig(qConfig.Id, processor.config.Consumer.Group, processor.config.Consumer.Name) - initOffset, _ := queue.GetOffset(qConfig, consumer) - offset := initOffset - defer func() { - log.Debugf("worker:[%v] start consume queue:[%v] offset:%v", workerID, qConfig.Id, offset) - }() - - for { - if ctx.IsCanceled() { - return - } - ctx1, messages, isTimeout, err := queue.Consume(qConfig, consumer, offset) - - if len(messages)==0{ - time.Sleep(time.Millisecond * time.Duration(500)) - } - - //if timeout{ - // log.Tracef("timeout on queue:[%v]",qConfig.Name) - // ctx.Failed() - // return - //} - - if err != nil { - log.Tracef("error on queue:[%v]", qConfig.Name) - if err.Error() == "EOF" { - if len(messages) > 0 { - goto HANDLE_MESSAGE - } - return - } - //panic(err) - if isTimeout { - time.Sleep(time.Millisecond * 1000) - } - } - - HANDLE_MESSAGE: - - //update temp offset, not committed, continued reading - offset = ctx1.NextOffset.String()//TODO - - if len(messages) > 0 { - for _, pop := range messages { - - typ, err := jsonparser.GetString(pop.Data, "metadata", "name") + switch typ { + case "index_health_change": + //err = processor.HandleIndexHealthChange(&ev) + case "index_state_change": + indexState, _, _, err := jsonparser.Get(pop.Data, "payload", "index_state") if err != nil { panic(err) } - switch typ { - case "index_health_change": - //err = processor.HandleIndexHealthChange(&ev) - case "index_state_change": - indexState, _, _, err := jsonparser.Get(pop.Data, "payload", "index_state") - if err != nil { - panic(err) - } - err = processor.HandleIndexStateChange(indexState) - case "unknown_node_status": - processor.HandleUnknownNodeStatus(pop.Data) - } - - } - } - if err == nil { - if offset != "" && initOffset != offset { - ok, err := queue.CommitOffset(qConfig, consumer, offset) - if !ok || err != nil { - panic(err) - } - initOffset=offset - } - } else { - if !isTimeout { - log.Error(err) + err = processor.HandleIndexStateChange(indexState) + case "unknown_node_status": + processor.HandleUnknownNodeStatus(pop.Data) } } } + return nil } + func (processor *MetadataProcessor) HandleIndexStateChange(indexState []byte) error { esClient := elastic.GetClient(processor.config.Elasticsearch) // save index metadata