diff --git a/plugin/elastic/activity.go b/plugin/elastic/activity.go index 37ff1079..ba27abf4 100644 --- a/plugin/elastic/activity.go +++ b/plugin/elastic/activity.go @@ -261,6 +261,10 @@ func (processor *ActivityProcessor) HandleMessage(ctx *pipeline.Context, qConfig ctx1, messages, timeout, err := queue.Consume(qConfig, consumer, offset) + if len(messages)==0{ + time.Sleep(time.Millisecond * time.Duration(500)) + } + if timeout { log.Tracef("timeout on queue:[%v]", qConfig.Name) ctx.Failed(fmt.Errorf("timeout on queue:[%v]", qConfig.Name)) diff --git a/plugin/elastic/metadata.go b/plugin/elastic/metadata.go index 678fec99..4063e93d 100644 --- a/plugin/elastic/metadata.go +++ b/plugin/elastic/metadata.go @@ -285,6 +285,11 @@ func (processor *MetadataProcessor) HandleMessage(ctx *pipeline.Context, qConfig return } ctx1, messages, isTimeout, err := queue.Consume(qConfig, consumer, offset) + + if len(messages)==0{ + time.Sleep(time.Millisecond * time.Duration(500)) + } + //if timeout{ // log.Tracef("timeout on queue:[%v]",qConfig.Name) // ctx.Failed()