From 74bfb9c5b7b5a1eec5a54071b03b0b400d16dcad Mon Sep 17 00:00:00 2001 From: zhangwei <894646498@qq.com> Date: Wed, 27 Dec 2023 19:09:45 +0800 Subject: [PATCH] =?UTF-8?q?=E9=98=9F=E5=88=97=E6=B7=BB=E5=8A=A0done?= =?UTF-8?q?=E6=93=8D=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Former-commit-id: 03494c2361dd8a1c5fc81c452b51fb9579b70a75 --- api/internal/mqs/queue.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/api/internal/mqs/queue.go b/api/internal/mqs/queue.go index 53ad1096..4b4f5aa2 100644 --- a/api/internal/mqs/queue.go +++ b/api/internal/mqs/queue.go @@ -22,7 +22,6 @@ import ( "github.com/zeromicro/go-zero/core/threading" "k8s.io/apimachinery/pkg/util/json" "sync" - "time" ) var InsQueue *workQueue @@ -117,6 +116,20 @@ func (b *Beta) Add(item interface{}) { b.queue = append(b.queue, item) b.cond.Signal() } + +func (b *Beta) Done(item interface{}) { + b.cond.L.Lock() + defer b.cond.L.Unlock() + + b.processing.delete(item) + if b.dirty.has(item) { + b.queue = append(b.queue, item) + b.cond.Signal() + } else if b.processing.len() == 0 { + b.cond.Signal() + } +} + func (w *workQueue) Start() { w.startConsumers() w.consumerRoutines.Wait() @@ -140,16 +153,14 @@ func (w *workQueue) startConsumers() { w.consumerRoutines.Run(func() { for { item := w.Beta.Get() - println("开始消费 ") if item != nil { bytes, err := json.Marshal(item) if err != nil { return } w.consumeOne(string(bytes)) - println("开始消费3") } - time.Sleep(1 * time.Second) + w.Beta.Done(item) } })