From 1b9387b683c03cac84f50c9fa17268d131e3dff6 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 5 Jun 2023 17:45:46 +0800 Subject: [PATCH] fix:remove tq lock in write thread --- source/dnode/vnode/src/tq/tqPush.c | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index c10b7fc51f..4048ebe3f9 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -17,20 +17,16 @@ #include "vnd.h" int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { - int32_t vgId = TD_VID(pTq->pVnode); - - taosWLockLatch(&pTq->lock); - if (taosHashGetSize(pTq->pPushMgr) > 0) { - SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME_PUSH}; - msg.pCont = rpcMallocCont(sizeof(SMsgHead)); - msg.contLen = sizeof(SMsgHead); - SMsgHead *pHead = msg.pCont; - pHead->vgId = vgId; - pHead->contLen = msg.contLen; - tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); + if (taosHashGetSize(pTq->pPushMgr) <= 0) { + return 0; } - // unlock - taosWUnLockLatch(&pTq->lock); + SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME_PUSH}; + msg.pCont = rpcMallocCont(sizeof(SMsgHead)); + msg.contLen = sizeof(SMsgHead); + SMsgHead *pHead = msg.pCont; + pHead->vgId = TD_VID(pTq->pVnode); + pHead->contLen = msg.contLen; + tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); return 0; }