From 241c9f57fbe41fb27eed542588f9fdf8b767a615 Mon Sep 17 00:00:00 2001 From: medcl Date: Fri, 26 May 2023 12:02:39 +0800 Subject: [PATCH] sleep when no message was found in queue --- plugin/elastic/activity.go | 4 ++++ plugin/elastic/metadata.go | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/plugin/elastic/activity.go b/plugin/elastic/activity.go index 37ff1079..ba27abf4 100644 --- a/plugin/elastic/activity.go +++ b/plugin/elastic/activity.go @@ -261,6 +261,10 @@ func (processor *ActivityProcessor) HandleMessage(ctx *pipeline.Context, qConfig ctx1, messages, timeout, err := queue.Consume(qConfig, consumer, offset) + if len(messages)==0{ + time.Sleep(time.Millisecond * time.Duration(500)) + } + if timeout { log.Tracef("timeout on queue:[%v]", qConfig.Name) ctx.Failed(fmt.Errorf("timeout on queue:[%v]", qConfig.Name)) diff --git a/plugin/elastic/metadata.go b/plugin/elastic/metadata.go index 678fec99..4063e93d 100644 --- a/plugin/elastic/metadata.go +++ b/plugin/elastic/metadata.go @@ -285,6 +285,11 @@ func (processor *MetadataProcessor) HandleMessage(ctx *pipeline.Context, qConfig return } ctx1, messages, isTimeout, err := queue.Consume(qConfig, consumer, offset) + + if len(messages)==0{ + time.Sleep(time.Millisecond * time.Duration(500)) + } + //if timeout{ // log.Tracef("timeout on queue:[%v]",qConfig.Name) // ctx.Failed()