sleep when no message was found in queue
This commit is contained in:
parent
14ed65fa94
commit
241c9f57fb
|
@ -261,6 +261,10 @@ func (processor *ActivityProcessor) HandleMessage(ctx *pipeline.Context, qConfig
|
||||||
|
|
||||||
ctx1, messages, timeout, err := queue.Consume(qConfig, consumer, offset)
|
ctx1, messages, timeout, err := queue.Consume(qConfig, consumer, offset)
|
||||||
|
|
||||||
|
if len(messages)==0{
|
||||||
|
time.Sleep(time.Millisecond * time.Duration(500))
|
||||||
|
}
|
||||||
|
|
||||||
if timeout {
|
if timeout {
|
||||||
log.Tracef("timeout on queue:[%v]", qConfig.Name)
|
log.Tracef("timeout on queue:[%v]", qConfig.Name)
|
||||||
ctx.Failed(fmt.Errorf("timeout on queue:[%v]", qConfig.Name))
|
ctx.Failed(fmt.Errorf("timeout on queue:[%v]", qConfig.Name))
|
||||||
|
|
|
@ -285,6 +285,11 @@ func (processor *MetadataProcessor) HandleMessage(ctx *pipeline.Context, qConfig
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ctx1, messages, isTimeout, err := queue.Consume(qConfig, consumer, offset)
|
ctx1, messages, isTimeout, err := queue.Consume(qConfig, consumer, offset)
|
||||||
|
|
||||||
|
if len(messages)==0{
|
||||||
|
time.Sleep(time.Millisecond * time.Duration(500))
|
||||||
|
}
|
||||||
|
|
||||||
//if timeout{
|
//if timeout{
|
||||||
// log.Tracef("timeout on queue:[%v]",qConfig.Name)
|
// log.Tracef("timeout on queue:[%v]",qConfig.Name)
|
||||||
// ctx.Failed()
|
// ctx.Failed()
|
||||||
|
|
Loading…
Reference in New Issue