refactoring processors, simplicity queue consumation
This commit is contained in:
parent
99a211cde8
commit
ad05f229af
|
@ -51,11 +51,7 @@ pipeline:
|
||||||
auto_start: true
|
auto_start: true
|
||||||
keep_running: true
|
keep_running: true
|
||||||
processor:
|
processor:
|
||||||
- metadata:
|
- consumer:
|
||||||
bulk_size_in_mb: 5
|
|
||||||
bulk_max_docs_count: 5000
|
|
||||||
fetch_max_messages: 100
|
|
||||||
elasticsearch: "$[[CLUSTER_ID]]"
|
|
||||||
queues:
|
queues:
|
||||||
type: metadata
|
type: metadata
|
||||||
category: elasticsearch
|
category: elasticsearch
|
||||||
|
@ -63,15 +59,15 @@ pipeline:
|
||||||
group: metadata
|
group: metadata
|
||||||
when:
|
when:
|
||||||
cluster_available: ["$[[CLUSTER_ID]]"]
|
cluster_available: ["$[[CLUSTER_ID]]"]
|
||||||
|
processor:
|
||||||
|
- metadata:
|
||||||
|
elasticsearch: "$[[CLUSTER_ID]]"
|
||||||
|
|
||||||
- name: activity_ingest
|
- name: activity_ingest
|
||||||
auto_start: true
|
auto_start: true
|
||||||
keep_running: true
|
keep_running: true
|
||||||
processor:
|
processor:
|
||||||
- activity:
|
- consumer:
|
||||||
bulk_size_in_mb: 5
|
|
||||||
bulk_max_docs_count: 5000
|
|
||||||
fetch_max_messages: 100
|
|
||||||
elasticsearch: "$[[CLUSTER_ID]]"
|
|
||||||
queues:
|
queues:
|
||||||
category: elasticsearch
|
category: elasticsearch
|
||||||
activity: true
|
activity: true
|
||||||
|
@ -79,6 +75,9 @@ pipeline:
|
||||||
group: activity
|
group: activity
|
||||||
when:
|
when:
|
||||||
cluster_available: ["$[[CLUSTER_ID]]"]
|
cluster_available: ["$[[CLUSTER_ID]]"]
|
||||||
|
processor:
|
||||||
|
- activity:
|
||||||
|
elasticsearch: "$[[CLUSTER_ID]]"
|
||||||
- name: migration_task_dispatcher
|
- name: migration_task_dispatcher
|
||||||
auto_start: true
|
auto_start: true
|
||||||
keep_running: true
|
keep_running: true
|
||||||
|
|
|
@ -11,26 +11,17 @@ import (
|
||||||
"github.com/segmentio/encoding/json"
|
"github.com/segmentio/encoding/json"
|
||||||
"infini.sh/framework/core/config"
|
"infini.sh/framework/core/config"
|
||||||
"infini.sh/framework/core/elastic"
|
"infini.sh/framework/core/elastic"
|
||||||
|
"infini.sh/framework/core/errors"
|
||||||
"infini.sh/framework/core/event"
|
"infini.sh/framework/core/event"
|
||||||
"infini.sh/framework/core/global"
|
|
||||||
"infini.sh/framework/core/orm"
|
"infini.sh/framework/core/orm"
|
||||||
"infini.sh/framework/core/pipeline"
|
"infini.sh/framework/core/pipeline"
|
||||||
"infini.sh/framework/core/queue"
|
"infini.sh/framework/core/queue"
|
||||||
"infini.sh/framework/core/rotate"
|
|
||||||
"infini.sh/framework/core/util"
|
"infini.sh/framework/core/util"
|
||||||
"runtime"
|
|
||||||
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type ActivityProcessor struct {
|
type ActivityProcessor struct {
|
||||||
config *Config
|
config *Config
|
||||||
runningConfigs map[string]*queue.QueueConfig
|
id string
|
||||||
wg sync.WaitGroup
|
|
||||||
inFlightQueueConfigs sync.Map
|
|
||||||
detectorRunning bool
|
|
||||||
id string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -39,44 +30,21 @@ func init() {
|
||||||
|
|
||||||
func NewActivityProcessor(c *config.Config) (pipeline.Processor, error) {
|
func NewActivityProcessor(c *config.Config) (pipeline.Processor, error) {
|
||||||
cfg := Config{
|
cfg := Config{
|
||||||
NumOfWorkers: 1,
|
MessageField: "messages",
|
||||||
MaxWorkers: 10,
|
|
||||||
MaxConnectionPerHost: 1,
|
|
||||||
IdleTimeoutInSecond: 5,
|
|
||||||
DetectIntervalInMs: 10000,
|
|
||||||
Queues: map[string]interface{}{},
|
|
||||||
|
|
||||||
Consumer: queue.ConsumerConfig{
|
|
||||||
Group: "activity-001",
|
|
||||||
Name: "activity-001",
|
|
||||||
FetchMinBytes: 1,
|
|
||||||
FetchMaxBytes: 10 * 1024 * 1024,
|
|
||||||
FetchMaxMessages: 500,
|
|
||||||
EOFRetryDelayInMs: 1000,
|
|
||||||
FetchMaxWaitMs: 10000,
|
|
||||||
},
|
|
||||||
|
|
||||||
DetectActiveQueue: true,
|
|
||||||
ValidateRequest: false,
|
|
||||||
SkipEmptyQueue: true,
|
|
||||||
SkipOnMissingInfo: false,
|
|
||||||
RotateConfig: rotate.DefaultConfig,
|
|
||||||
BulkConfig: elastic.DefaultBulkProcessorConfig,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.Unpack(&cfg); err != nil {
|
if err := c.Unpack(&cfg); err != nil {
|
||||||
log.Error(err)
|
|
||||||
return nil, fmt.Errorf("failed to unpack the configuration of flow_runner processor: %s", err)
|
return nil, fmt.Errorf("failed to unpack the configuration of flow_runner processor: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
runner := ActivityProcessor{
|
if cfg.Elasticsearch == "" {
|
||||||
id: util.GetUUID(),
|
return nil, errors.New("elasticsearch config was not found in metadata processor")
|
||||||
config: &cfg,
|
|
||||||
runningConfigs: map[string]*queue.QueueConfig{},
|
|
||||||
inFlightQueueConfigs: sync.Map{},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
runner.wg = sync.WaitGroup{}
|
runner := ActivityProcessor{
|
||||||
|
id: util.GetUUID(),
|
||||||
|
config: &cfg,
|
||||||
|
}
|
||||||
|
|
||||||
return &runner, nil
|
return &runner, nil
|
||||||
}
|
}
|
||||||
|
@ -85,238 +53,32 @@ func (processor *ActivityProcessor) Name() string {
|
||||||
return "activity"
|
return "activity"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (processor *ActivityProcessor) Process(c *pipeline.Context) error {
|
func (processor *ActivityProcessor) Process(ctx *pipeline.Context) error {
|
||||||
defer func() {
|
//get message from queue
|
||||||
if !global.Env().IsDebug {
|
obj := ctx.Get(processor.config.MessageField)
|
||||||
if r := recover(); r != nil {
|
if obj != nil {
|
||||||
var v string
|
messages := obj.([]queue.Message)
|
||||||
switch r.(type) {
|
log.Tracef("get %v messages from context", len(messages))
|
||||||
case error:
|
if len(messages) == 0 {
|
||||||
v = r.(error).Error()
|
return nil
|
||||||
case runtime.Error:
|
}
|
||||||
v = r.(runtime.Error).Error()
|
for _, pop := range messages {
|
||||||
case string:
|
typ, err := jsonparser.GetString(pop.Data, "metadata", "name")
|
||||||
v = r.(string)
|
if err != nil {
|
||||||
}
|
panic(err)
|
||||||
log.Error("error in activity processor,", v)
|
|
||||||
}
|
}
|
||||||
}
|
switch typ {
|
||||||
log.Trace("exit activity processor")
|
case "activity":
|
||||||
}()
|
activity, _, _, err := jsonparser.Get(pop.Data, "payload", "activity")
|
||||||
|
|
||||||
//handle updates
|
|
||||||
if processor.config.DetectActiveQueue {
|
|
||||||
log.Tracef("detector running [%v]", processor.detectorRunning)
|
|
||||||
if !processor.detectorRunning {
|
|
||||||
processor.detectorRunning = true
|
|
||||||
processor.wg.Add(1)
|
|
||||||
go func(c *pipeline.Context) {
|
|
||||||
log.Tracef("init detector for active queue [%v] ", processor.id)
|
|
||||||
defer func() {
|
|
||||||
if !global.Env().IsDebug {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
var v string
|
|
||||||
switch r.(type) {
|
|
||||||
case error:
|
|
||||||
v = r.(error).Error()
|
|
||||||
case runtime.Error:
|
|
||||||
v = r.(runtime.Error).Error()
|
|
||||||
case string:
|
|
||||||
v = r.(string)
|
|
||||||
}
|
|
||||||
log.Error("error in activity processor,", v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
processor.detectorRunning = false
|
|
||||||
log.Debug("exit detector for active queue")
|
|
||||||
processor.wg.Done()
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
if c.IsCanceled() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if global.Env().IsDebug {
|
|
||||||
log.Tracef("inflight queues: %v", util.MapLength(&processor.inFlightQueueConfigs))
|
|
||||||
processor.inFlightQueueConfigs.Range(func(key, value interface{}) bool {
|
|
||||||
log.Tracef("inflight queue:%v", key)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
cfgs := queue.GetConfigByLabels(processor.config.Queues)
|
|
||||||
for _, v := range cfgs {
|
|
||||||
if c.IsCanceled() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
//if have depth and not in in flight
|
|
||||||
if queue.HasLag(v) {
|
|
||||||
_, ok := processor.inFlightQueueConfigs.Load(v.Id)
|
|
||||||
if !ok {
|
|
||||||
log.Tracef("detecting new queue: %v", v.Name)
|
|
||||||
processor.HandleQueueConfig(v, c)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if processor.config.DetectIntervalInMs > 0 {
|
|
||||||
time.Sleep(time.Millisecond * time.Duration(processor.config.DetectIntervalInMs))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(c)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
cfgs := queue.GetConfigByLabels(processor.config.Queues)
|
|
||||||
log.Debugf("filter queue by:%v, num of queues:%v", processor.config.Queues, len(cfgs))
|
|
||||||
for _, v := range cfgs {
|
|
||||||
log.Tracef("checking queue: %v", v)
|
|
||||||
processor.HandleQueueConfig(v, c)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
processor.wg.Wait()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (processor *ActivityProcessor) HandleQueueConfig(v *queue.QueueConfig, c *pipeline.Context) {
|
|
||||||
|
|
||||||
if processor.config.SkipEmptyQueue {
|
|
||||||
if !queue.HasLag(v) {
|
|
||||||
if global.Env().IsDebug {
|
|
||||||
log.Tracef("skip empty queue:[%v]", v.Name)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
elasticsearch := processor.config.Elasticsearch
|
|
||||||
if elasticsearch == "" {
|
|
||||||
log.Error("elasticsearch config was not found in activity processor")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
meta := elastic.GetMetadata(util.ToString(elasticsearch))
|
|
||||||
if meta == nil {
|
|
||||||
log.Debugf("metadata for [%v] is nil", elasticsearch)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
host := meta.GetActiveHost()
|
|
||||||
log.Debugf("random choose node [%v] to consume queue [%v]", host, v.Id)
|
|
||||||
processor.wg.Add(1)
|
|
||||||
|
|
||||||
//go processor.NewBulkWorker("bulk_indexing_"+host,c, processor.bulkSizeInByte, v, host)
|
|
||||||
go processor.HandleMessage(c, v)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (processor *ActivityProcessor) HandleMessage(ctx *pipeline.Context, qConfig *queue.QueueConfig) {
|
|
||||||
defer func() {
|
|
||||||
if !global.Env().IsDebug {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
var v string
|
|
||||||
switch r.(type) {
|
|
||||||
case error:
|
|
||||||
v = r.(error).Error()
|
|
||||||
case runtime.Error:
|
|
||||||
v = r.(runtime.Error).Error()
|
|
||||||
case string:
|
|
||||||
v = r.(string)
|
|
||||||
}
|
|
||||||
log.Errorf("error in %s processor: %v", processor.Name(), v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
processor.wg.Done()
|
|
||||||
log.Tracef("exit %s processor", processor.Name())
|
|
||||||
}()
|
|
||||||
|
|
||||||
key := qConfig.Id
|
|
||||||
|
|
||||||
if processor.config.MaxWorkers > 0 && util.MapLength(&processor.inFlightQueueConfigs) > processor.config.MaxWorkers {
|
|
||||||
log.Debugf("reached max num of workers, skip init [%v]", qConfig.Name)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var workerID = util.GetUUID()
|
|
||||||
_, exists := processor.inFlightQueueConfigs.Load(key)
|
|
||||||
if exists {
|
|
||||||
log.Errorf("queue [%v] has more then one consumer", qConfig.Id)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
processor.inFlightQueueConfigs.Store(key, workerID)
|
|
||||||
log.Debugf("starting worker:[%v], queue:[%v]", workerID, qConfig.Name)
|
|
||||||
var consumer = queue.GetOrInitConsumerConfig(qConfig.Id, processor.config.Consumer.Group, processor.config.Consumer.Name)
|
|
||||||
initOffset, _ := queue.GetOffset(qConfig, consumer)
|
|
||||||
offset := initOffset
|
|
||||||
defer func() {
|
|
||||||
log.Debugf("worker:[%v] start consume queue:[%v] offset:%v", workerID, qConfig.Id, offset)
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
if ctx.IsCanceled() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx1, messages, timeout, err := queue.Consume(qConfig, consumer, offset)
|
|
||||||
|
|
||||||
if len(messages)==0{
|
|
||||||
time.Sleep(time.Millisecond * time.Duration(500))
|
|
||||||
}
|
|
||||||
|
|
||||||
if timeout {
|
|
||||||
log.Tracef("timeout on queue:[%v]", qConfig.Name)
|
|
||||||
ctx.Failed(fmt.Errorf("timeout on queue:[%v]", qConfig.Name))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Tracef("error on queue:[%v]", qConfig.Name)
|
|
||||||
if err.Error() == "EOF" {
|
|
||||||
if len(messages) > 0 {
|
|
||||||
goto HANDLE_MESSAGE
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
HANDLE_MESSAGE:
|
|
||||||
|
|
||||||
//update temp offset, not committed, continued reading
|
|
||||||
offset = ctx1.NextOffset.String()//TODO
|
|
||||||
|
|
||||||
if len(messages) > 0 {
|
|
||||||
for _, pop := range messages {
|
|
||||||
typ, err := jsonparser.GetString(pop.Data, "metadata", "name")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
switch typ {
|
|
||||||
case "activity":
|
|
||||||
activity, _, _, err := jsonparser.Get(pop.Data, "payload", "activity")
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = processor.HandleActivity(activity)
|
|
||||||
}
|
|
||||||
|
|
||||||
|
err = processor.HandleActivity(activity)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err == nil {
|
|
||||||
if offset != "" && initOffset != offset {
|
|
||||||
ok, err := queue.CommitOffset(qConfig, consumer, offset)
|
|
||||||
if !ok || err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
initOffset=offset
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Error(err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (processor *ActivityProcessor) HandleActivity(activityByte []byte) error {
|
func (processor *ActivityProcessor) HandleActivity(activityByte []byte) error {
|
||||||
|
|
|
@ -10,53 +10,23 @@ import (
|
||||||
log "github.com/cihub/seelog"
|
log "github.com/cihub/seelog"
|
||||||
"infini.sh/framework/core/config"
|
"infini.sh/framework/core/config"
|
||||||
"infini.sh/framework/core/elastic"
|
"infini.sh/framework/core/elastic"
|
||||||
|
"infini.sh/framework/core/errors"
|
||||||
"infini.sh/framework/core/event"
|
"infini.sh/framework/core/event"
|
||||||
"infini.sh/framework/core/global"
|
|
||||||
"infini.sh/framework/core/orm"
|
"infini.sh/framework/core/orm"
|
||||||
|
"infini.sh/framework/core/param"
|
||||||
"infini.sh/framework/core/pipeline"
|
"infini.sh/framework/core/pipeline"
|
||||||
"infini.sh/framework/core/queue"
|
"infini.sh/framework/core/queue"
|
||||||
"infini.sh/framework/core/rotate"
|
|
||||||
"infini.sh/framework/core/util"
|
"infini.sh/framework/core/util"
|
||||||
"runtime"
|
|
||||||
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type MetadataProcessor struct {
|
type MetadataProcessor struct {
|
||||||
config *Config
|
config *Config
|
||||||
runningConfigs map[string]*queue.QueueConfig
|
id string
|
||||||
wg sync.WaitGroup
|
|
||||||
inFlightQueueConfigs sync.Map
|
|
||||||
detectorRunning bool
|
|
||||||
id string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
NumOfWorkers int `config:"worker_size"`
|
MessageField param.ParaKey `config:"message_field"`
|
||||||
|
Elasticsearch string `config:"elasticsearch,omitempty"`
|
||||||
IdleTimeoutInSecond int `config:"idle_timeout_in_seconds"`
|
|
||||||
MaxConnectionPerHost int `config:"max_connection_per_node"`
|
|
||||||
|
|
||||||
Queues map[string]interface{} `config:"queues,omitempty"`
|
|
||||||
|
|
||||||
Consumer queue.ConsumerConfig `config:"consumer"`
|
|
||||||
|
|
||||||
MaxWorkers int `config:"max_worker_size"`
|
|
||||||
|
|
||||||
DetectActiveQueue bool `config:"detect_active_queue"`
|
|
||||||
DetectIntervalInMs int `config:"detect_interval"`
|
|
||||||
|
|
||||||
ValidateRequest bool `config:"valid_request"`
|
|
||||||
SkipEmptyQueue bool `config:"skip_empty_queue"`
|
|
||||||
SkipOnMissingInfo bool `config:"skip_info_missing"`
|
|
||||||
|
|
||||||
RotateConfig rotate.RotateConfig `config:"rotate"`
|
|
||||||
BulkConfig elastic.BulkProcessorConfig `config:"bulk"`
|
|
||||||
|
|
||||||
Elasticsearch string `config:"elasticsearch,omitempty"`
|
|
||||||
|
|
||||||
WaitingAfter []string `config:"waiting_after"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -65,45 +35,21 @@ func init() {
|
||||||
|
|
||||||
func New(c *config.Config) (pipeline.Processor, error) {
|
func New(c *config.Config) (pipeline.Processor, error) {
|
||||||
cfg := Config{
|
cfg := Config{
|
||||||
NumOfWorkers: 1,
|
MessageField: "messages",
|
||||||
MaxWorkers: 10,
|
|
||||||
MaxConnectionPerHost: 1,
|
|
||||||
IdleTimeoutInSecond: 5,
|
|
||||||
DetectIntervalInMs: 10000,
|
|
||||||
Queues: map[string]interface{}{},
|
|
||||||
|
|
||||||
Consumer: queue.ConsumerConfig{
|
|
||||||
Group: "metadata-001",
|
|
||||||
Name: "metadata-001",
|
|
||||||
FetchMinBytes: 1,
|
|
||||||
FetchMaxBytes: 10 * 1024 * 1024,
|
|
||||||
FetchMaxMessages: 500,
|
|
||||||
EOFRetryDelayInMs: 1000,
|
|
||||||
FetchMaxWaitMs: 10000,
|
|
||||||
},
|
|
||||||
|
|
||||||
DetectActiveQueue: true,
|
|
||||||
ValidateRequest: false,
|
|
||||||
SkipEmptyQueue: true,
|
|
||||||
SkipOnMissingInfo: false,
|
|
||||||
RotateConfig: rotate.DefaultConfig,
|
|
||||||
BulkConfig: elastic.DefaultBulkProcessorConfig,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := c.Unpack(&cfg); err != nil {
|
if err := c.Unpack(&cfg); err != nil {
|
||||||
log.Error(err)
|
|
||||||
return nil, fmt.Errorf("failed to unpack the configuration of flow_runner processor: %s", err)
|
return nil, fmt.Errorf("failed to unpack the configuration of flow_runner processor: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
runner := MetadataProcessor{
|
if cfg.Elasticsearch == "" {
|
||||||
id: util.GetUUID(),
|
return nil, errors.New("elasticsearch config was not found in metadata processor")
|
||||||
config: &cfg,
|
|
||||||
runningConfigs: map[string]*queue.QueueConfig{},
|
|
||||||
inFlightQueueConfigs: sync.Map{},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
runner.wg = sync.WaitGroup{}
|
runner := MetadataProcessor{
|
||||||
|
id: util.GetUUID(),
|
||||||
|
config: &cfg,
|
||||||
|
}
|
||||||
return &runner, nil
|
return &runner, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,247 +57,38 @@ func (processor *MetadataProcessor) Name() string {
|
||||||
return "metadata"
|
return "metadata"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (processor *MetadataProcessor) Process(c *pipeline.Context) error {
|
func (processor *MetadataProcessor) Process(ctx *pipeline.Context) error {
|
||||||
defer func() {
|
|
||||||
if !global.Env().IsDebug {
|
//get message from queue
|
||||||
if r := recover(); r != nil {
|
obj := ctx.Get(processor.config.MessageField)
|
||||||
var v string
|
if obj != nil {
|
||||||
switch r.(type) {
|
messages := obj.([]queue.Message)
|
||||||
case error:
|
log.Tracef("get %v messages from context", len(messages))
|
||||||
v = r.(error).Error()
|
if len(messages) == 0 {
|
||||||
case runtime.Error:
|
return nil
|
||||||
v = r.(runtime.Error).Error()
|
}
|
||||||
case string:
|
for _, pop := range messages {
|
||||||
v = r.(string)
|
typ, err := jsonparser.GetString(pop.Data, "metadata", "name")
|
||||||
}
|
if err != nil {
|
||||||
log.Error("error in metadata processor,", v)
|
panic(err)
|
||||||
}
|
}
|
||||||
}
|
switch typ {
|
||||||
log.Trace("exit metadata processor")
|
case "index_health_change":
|
||||||
}()
|
//err = processor.HandleIndexHealthChange(&ev)
|
||||||
|
case "index_state_change":
|
||||||
//handle updates
|
indexState, _, _, err := jsonparser.Get(pop.Data, "payload", "index_state")
|
||||||
if processor.config.DetectActiveQueue {
|
|
||||||
log.Tracef("detector running [%v]", processor.detectorRunning)
|
|
||||||
if !processor.detectorRunning {
|
|
||||||
processor.detectorRunning = true
|
|
||||||
processor.wg.Add(1)
|
|
||||||
go func(c *pipeline.Context) {
|
|
||||||
log.Tracef("init detector for active queue [%v] ", processor.id)
|
|
||||||
defer func() {
|
|
||||||
if !global.Env().IsDebug {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
var v string
|
|
||||||
switch r.(type) {
|
|
||||||
case error:
|
|
||||||
v = r.(error).Error()
|
|
||||||
case runtime.Error:
|
|
||||||
v = r.(runtime.Error).Error()
|
|
||||||
case string:
|
|
||||||
v = r.(string)
|
|
||||||
}
|
|
||||||
log.Error("error in metadata processor,", v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
processor.detectorRunning = false
|
|
||||||
log.Debug("exit detector for active queue")
|
|
||||||
processor.wg.Done()
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
if c.IsCanceled() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if global.Env().IsDebug {
|
|
||||||
log.Tracef("inflight queues: %v", util.MapLength(&processor.inFlightQueueConfigs))
|
|
||||||
processor.inFlightQueueConfigs.Range(func(key, value interface{}) bool {
|
|
||||||
log.Tracef("inflight queue:%v", key)
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
cfgs := queue.GetConfigByLabels(processor.config.Queues)
|
|
||||||
for _, v := range cfgs {
|
|
||||||
if c.IsCanceled() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
//if have depth and not in in flight
|
|
||||||
if queue.HasLag(v) {
|
|
||||||
_, ok := processor.inFlightQueueConfigs.Load(v.Id)
|
|
||||||
if !ok {
|
|
||||||
log.Tracef("detecting new queue: %v", v.Name)
|
|
||||||
processor.HandleQueueConfig(v, c)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if processor.config.DetectIntervalInMs > 0 {
|
|
||||||
time.Sleep(time.Millisecond * time.Duration(processor.config.DetectIntervalInMs))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(c)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
cfgs := queue.GetConfigByLabels(processor.config.Queues)
|
|
||||||
log.Debugf("filter queue by:%v, num of queues:%v", processor.config.Queues, len(cfgs))
|
|
||||||
for _, v := range cfgs {
|
|
||||||
log.Tracef("checking queue: %v", v)
|
|
||||||
processor.HandleQueueConfig(v, c)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
processor.wg.Wait()
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (processor *MetadataProcessor) HandleQueueConfig(v *queue.QueueConfig, c *pipeline.Context) {
|
|
||||||
|
|
||||||
if processor.config.SkipEmptyQueue {
|
|
||||||
if !queue.HasLag(v) {
|
|
||||||
if global.Env().IsDebug {
|
|
||||||
log.Tracef("skip empty queue:[%v]", v.Name)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
elasticsearch := processor.config.Elasticsearch
|
|
||||||
if elasticsearch == "" {
|
|
||||||
log.Error("elasticsearch config was not found in metadata processor")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
meta := elastic.GetMetadata(util.ToString(elasticsearch))
|
|
||||||
if meta == nil {
|
|
||||||
log.Debugf("metadata for [%v] is nil", elasticsearch)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
host := meta.GetActiveHost()
|
|
||||||
log.Debugf("random choose node [%v] to consume queue [%v]", host, v.Id)
|
|
||||||
processor.wg.Add(1)
|
|
||||||
|
|
||||||
//go processor.NewBulkWorker("bulk_indexing_"+host,c, processor.bulkSizeInByte, v, host)
|
|
||||||
go processor.HandleMessage(c, v)
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func (processor *MetadataProcessor) HandleMessage(ctx *pipeline.Context, qConfig *queue.QueueConfig) {
|
|
||||||
defer func() {
|
|
||||||
if !global.Env().IsDebug {
|
|
||||||
if r := recover(); r != nil {
|
|
||||||
var v string
|
|
||||||
switch r.(type) {
|
|
||||||
case error:
|
|
||||||
v = r.(error).Error()
|
|
||||||
case runtime.Error:
|
|
||||||
v = r.(runtime.Error).Error()
|
|
||||||
case string:
|
|
||||||
v = r.(string)
|
|
||||||
}
|
|
||||||
log.Errorf("error in %s processor: %v", processor.Name(), v)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
processor.wg.Done()
|
|
||||||
log.Tracef("exit %s processor", processor.Name())
|
|
||||||
}()
|
|
||||||
|
|
||||||
key := qConfig.Id
|
|
||||||
|
|
||||||
if processor.config.MaxWorkers > 0 && util.MapLength(&processor.inFlightQueueConfigs) > processor.config.MaxWorkers {
|
|
||||||
log.Debugf("reached max num of workers, skip init [%v]", qConfig.Name)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var workerID = util.GetUUID()
|
|
||||||
_, exists := processor.inFlightQueueConfigs.Load(key)
|
|
||||||
if exists {
|
|
||||||
log.Errorf("queue [%v] has more then one consumer", qConfig.Id)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
processor.inFlightQueueConfigs.Store(key, workerID)
|
|
||||||
log.Debugf("starting worker:[%v], queue:[%v]", workerID, qConfig.Name)
|
|
||||||
var consumer = queue.GetOrInitConsumerConfig(qConfig.Id, processor.config.Consumer.Group, processor.config.Consumer.Name)
|
|
||||||
initOffset, _ := queue.GetOffset(qConfig, consumer)
|
|
||||||
offset := initOffset
|
|
||||||
defer func() {
|
|
||||||
log.Debugf("worker:[%v] start consume queue:[%v] offset:%v", workerID, qConfig.Id, offset)
|
|
||||||
}()
|
|
||||||
|
|
||||||
for {
|
|
||||||
if ctx.IsCanceled() {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
ctx1, messages, isTimeout, err := queue.Consume(qConfig, consumer, offset)
|
|
||||||
|
|
||||||
if len(messages)==0{
|
|
||||||
time.Sleep(time.Millisecond * time.Duration(500))
|
|
||||||
}
|
|
||||||
|
|
||||||
//if timeout{
|
|
||||||
// log.Tracef("timeout on queue:[%v]",qConfig.Name)
|
|
||||||
// ctx.Failed()
|
|
||||||
// return
|
|
||||||
//}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
log.Tracef("error on queue:[%v]", qConfig.Name)
|
|
||||||
if err.Error() == "EOF" {
|
|
||||||
if len(messages) > 0 {
|
|
||||||
goto HANDLE_MESSAGE
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
//panic(err)
|
|
||||||
if isTimeout {
|
|
||||||
time.Sleep(time.Millisecond * 1000)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
HANDLE_MESSAGE:
|
|
||||||
|
|
||||||
//update temp offset, not committed, continued reading
|
|
||||||
offset = ctx1.NextOffset.String()//TODO
|
|
||||||
|
|
||||||
if len(messages) > 0 {
|
|
||||||
for _, pop := range messages {
|
|
||||||
|
|
||||||
typ, err := jsonparser.GetString(pop.Data, "metadata", "name")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
switch typ {
|
err = processor.HandleIndexStateChange(indexState)
|
||||||
case "index_health_change":
|
case "unknown_node_status":
|
||||||
//err = processor.HandleIndexHealthChange(&ev)
|
processor.HandleUnknownNodeStatus(pop.Data)
|
||||||
case "index_state_change":
|
|
||||||
indexState, _, _, err := jsonparser.Get(pop.Data, "payload", "index_state")
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
err = processor.HandleIndexStateChange(indexState)
|
|
||||||
case "unknown_node_status":
|
|
||||||
processor.HandleUnknownNodeStatus(pop.Data)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err == nil {
|
|
||||||
if offset != "" && initOffset != offset {
|
|
||||||
ok, err := queue.CommitOffset(qConfig, consumer, offset)
|
|
||||||
if !ok || err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
initOffset=offset
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if !isTimeout {
|
|
||||||
log.Error(err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (processor *MetadataProcessor) HandleIndexStateChange(indexState []byte) error {
|
func (processor *MetadataProcessor) HandleIndexStateChange(indexState []byte) error {
|
||||||
esClient := elastic.GetClient(processor.config.Elasticsearch)
|
esClient := elastic.GetClient(processor.config.Elasticsearch)
|
||||||
// save index metadata
|
// save index metadata
|
||||||
|
|
Loading…
Reference in New Issue