diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 43bb92ec23..700c6cf8a3 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -308,7 +308,8 @@ struct SVnode { SSink* pSink; tsem_t canCommit; int64_t sync; - int32_t blockCount; + SRWLatch lock; + bool blocked; bool restored; tsem_t syncSem; SQHandle* pQuery; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 1ba74ac3be..4ee5c4760c 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -85,7 +85,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { pVnode->state.commitTerm = info.state.commitTerm; pVnode->pTfs = pTfs; pVnode->msgCb = msgCb; - pVnode->blockCount = 0; + taosInitRWLatch(&pVnode->lock); + pVnode->blocked = false; tsem_init(&pVnode->syncSem, 0, 0); tsem_init(&(pVnode->canCommit), 0, 1); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 7ac124fdd3..50d32f5f5e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -28,20 +28,28 @@ static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; } static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { if (vnodeIsMsgBlock(pMsg->msgType)) { const STraceId *trace = &pMsg->info.traceId; - vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); - pVnode->blockCount = 1; - tsem_wait(&pVnode->syncSem); + taosWLockLatch(&pVnode->lock); + if (!pVnode->blocked) { + vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); + pVnode->blocked = true; + taosWUnLockLatch(&pVnode->lock); + tsem_wait(&pVnode->syncSem); + } else { + taosWUnLockLatch(&pVnode->lock); + } } } static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { if (vnodeIsMsgBlock(pMsg->msgType)) { const STraceId *trace = &pMsg->info.traceId; - if (pVnode->blockCount) { + taosWLockLatch(&pVnode->lock); + if (pVnode->blocked) { vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); - pVnode->blockCount = 0; + pVnode->blocked = false; tsem_post(&pVnode->syncSem); } + taosWUnLockLatch(&pVnode->lock); } } @@ -677,6 +685,12 @@ static void vnodeBecomeFollower(struct SSyncFSM *pFsm) { vDebug("vgId:%d, become follower", pVnode->config.vgId); // clear old leader resource + taosWLockLatch(&pVnode->lock); + if (pVnode->blocked) { + pVnode->blocked = false; + tsem_post(&pVnode->syncSem); + } + taosWUnLockLatch(&pVnode->lock); } static void vnodeBecomeLeader(struct SSyncFSM *pFsm) {