From 64c165317bb50fe7e63593353843c5a31b50aee2 Mon Sep 17 00:00:00 2001 From: kercylan98 Date: Mon, 8 Apr 2024 09:35:46 +0800 Subject: [PATCH] =?UTF-8?q?other:=20=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/internal/v2/reactor/reactor.go | 40 ++++++++++++++------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/server/internal/v2/reactor/reactor.go b/server/internal/v2/reactor/reactor.go index 55cd73d..ffe8d85 100644 --- a/server/internal/v2/reactor/reactor.go +++ b/server/internal/v2/reactor/reactor.go @@ -176,25 +176,7 @@ func (r *Reactor[M]) runQueue(q *queue.Queue[int, string, M]) { go func(r *Reactor[M], q *queue.Queue[int, string, M]) { defer r.wg.Done() for m := range q.Read() { - m(r.process, func(m queue.MessageWrapper[int, string, M], last bool) { - if last { - r.locationRW.RLock() - mq, exist := r.location[m.Ident()] - r.locationRW.RUnlock() - if exist { - r.locationRW.Lock() - defer r.locationRW.Unlock() - mq, exist = r.location[m.Ident()] - if exist { - delete(r.location, m.Ident()) - r.queueRW.RLock() - mq := r.queues[mq] - r.queueRW.RUnlock() - r.logger.Load().Debug("Reactor", log.String("action", "unbind"), log.Any("ident", m.Ident()), log.Any("queue", mq.Id())) - } - } - } - }) + m(r.process, r.processFinish) } }(r, q) } @@ -212,3 +194,23 @@ func (r *Reactor[M]) Close() { atomic.StoreInt32(&r.state, statusClosed) r.queueRW.Unlock() } + +func (r *Reactor[M]) processFinish(m queue.MessageWrapper[int, string, M], last bool) { + if last { + r.locationRW.RLock() + mq, exist := r.location[m.Ident()] + r.locationRW.RUnlock() + if exist { + r.locationRW.Lock() + defer r.locationRW.Unlock() + mq, exist = r.location[m.Ident()] + if exist { + delete(r.location, m.Ident()) + r.queueRW.RLock() + mq := r.queues[mq] + r.queueRW.RUnlock() + r.logger.Load().Debug("Reactor", log.String("action", "unbind"), log.Any("ident", m.Ident()), log.Any("queue", mq.Id())) + } + } + } +}