diff --git a/api/internal/mqs/queue.go b/api/internal/mqs/queue.go index 53ad1096..4b4f5aa2 100644 --- a/api/internal/mqs/queue.go +++ b/api/internal/mqs/queue.go @@ -22,7 +22,6 @@ import ( "github.com/zeromicro/go-zero/core/threading" "k8s.io/apimachinery/pkg/util/json" "sync" - "time" ) var InsQueue *workQueue @@ -117,6 +116,20 @@ func (b *Beta) Add(item interface{}) { b.queue = append(b.queue, item) b.cond.Signal() } + +func (b *Beta) Done(item interface{}) { + b.cond.L.Lock() + defer b.cond.L.Unlock() + + b.processing.delete(item) + if b.dirty.has(item) { + b.queue = append(b.queue, item) + b.cond.Signal() + } else if b.processing.len() == 0 { + b.cond.Signal() + } +} + func (w *workQueue) Start() { w.startConsumers() w.consumerRoutines.Wait() @@ -140,16 +153,14 @@ func (w *workQueue) startConsumers() { w.consumerRoutines.Run(func() { for { item := w.Beta.Get() - println("开始消费 ") if item != nil { bytes, err := json.Marshal(item) if err != nil { return } w.consumeOne(string(bytes)) - println("开始消费3") } - time.Sleep(1 * time.Second) + w.Beta.Done(item) } })