diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index f72b69a7de..505a1384ef 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -87,10 +87,11 @@ typedef struct { typedef struct { tsem_t syncSem; int64_t sync; - bool standby; SReplica replica; int32_t errCode; int32_t transId; + SRWLatch lock; + int8_t standby; int8_t leaderTransferFinish; } SSyncMgmt; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 37d5aeb62d..03e5c2b3a2 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -60,15 +60,19 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex); } + taosRLockLatch(&pMgmt->lock); if (transId <= 0) { + taosRUnLockLatch(&pMgmt->lock); mError("trans:%d, invalid commit msg", transId); } else if (transId == pMgmt->transId) { + taosRUnLockLatch(&pMgmt->lock); if (pMgmt->errCode != 0) { mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode)); } pMgmt->transId = 0; tsem_post(&pMgmt->syncSem); } else { + taosRUnLockLatch(&pMgmt->lock); STrans *pTrans = mndAcquireTrans(pMnode, transId); if (pTrans != NULL) { mDebug("trans:%d, execute in mnode which not leader", transId); @@ -115,6 +119,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM mInfo("trans:-1, sync reconfig is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term); + taosWLockLatch(&pMgmt->lock); if (pMgmt->transId == -1) { if (pMgmt->errCode != 0) { mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode)); @@ -122,6 +127,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM pMgmt->transId = 0; tsem_post(&pMgmt->syncSem); } + taosWUnLockLatch(&pMgmt->lock); } int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { @@ -170,12 +176,24 @@ static void mndBecomeFollower(struct SSyncFSM *pFsm) { SMnode *pMnode = pFsm->data; mDebug("vgId:1, become follower"); - // clear old leader resource + taosWLockLatch(&pMnode->syncMgmt.lock); + if (pMnode->syncMgmt.transId != 0) { + pMnode->syncMgmt.transId = 0; + tsem_post(&pMnode->syncMgmt.syncSem); + } + taosWUnLockLatch(&pMnode->syncMgmt.lock); } static void mndBecomeLeader(struct SSyncFSM *pFsm) { - SMnode *pMnode = pFsm->data; mDebug("vgId:1, become leader"); + SMnode *pMnode = pFsm->data; + + taosWLockLatch(&pMnode->syncMgmt.lock); + if (pMnode->syncMgmt.transId != 0) { + pMnode->syncMgmt.transId = 0; + tsem_post(&pMnode->syncMgmt.syncSem); + } + taosWUnLockLatch(&pMnode->syncMgmt.lock); } SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { @@ -202,6 +220,8 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { int32_t mndInitSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; + taosInitRWLatch(&pMgmt->lock); + pMgmt->transId = 0; SSyncInfo syncInfo = {.vgId = 1, .FpSendMsg = mndSyncSendMsg, .FpEqMsg = mndSyncEqMsg}; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP); @@ -254,11 +274,14 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { memcpy(req.pCont, pRaw, req.contLen); pMgmt->errCode = 0; + taosWLockLatch(&pMgmt->lock); pMgmt->transId = transId; + taosWUnLockLatch(&pMgmt->lock); mTrace("trans:%d, will be proposed", pMgmt->transId); const bool isWeak = false; int32_t code = syncPropose(pMgmt->sync, &req, isWeak); + if (code == 0) { tsem_wait(&pMgmt->syncSem); } else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) { @@ -286,10 +309,12 @@ void mndSyncStart(SMnode *pMnode) { } void mndSyncStop(SMnode *pMnode) { + taosWLockLatch(&pMnode->syncMgmt.lock); if (pMnode->syncMgmt.transId != 0) { pMnode->syncMgmt.transId = 0; tsem_post(&pMnode->syncMgmt.syncSem); } + taosWUnLockLatch(&pMnode->syncMgmt.lock); } bool mndIsMaster(SMnode *pMnode) { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 43bb92ec23..700c6cf8a3 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -308,7 +308,8 @@ struct SVnode { SSink* pSink; tsem_t canCommit; int64_t sync; - int32_t blockCount; + SRWLatch lock; + bool blocked; bool restored; tsem_t syncSem; SQHandle* pQuery; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 1ba74ac3be..4ee5c4760c 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -85,7 +85,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { pVnode->state.commitTerm = info.state.commitTerm; pVnode->pTfs = pTfs; pVnode->msgCb = msgCb; - pVnode->blockCount = 0; + taosInitRWLatch(&pVnode->lock); + pVnode->blocked = false; tsem_init(&pVnode->syncSem, 0, 0); tsem_init(&(pVnode->canCommit), 0, 1); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 7ac124fdd3..50d32f5f5e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -28,20 +28,28 @@ static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; } static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { if (vnodeIsMsgBlock(pMsg->msgType)) { const STraceId *trace = &pMsg->info.traceId; - vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); - pVnode->blockCount = 1; - tsem_wait(&pVnode->syncSem); + taosWLockLatch(&pVnode->lock); + if (!pVnode->blocked) { + vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); + pVnode->blocked = true; + taosWUnLockLatch(&pVnode->lock); + tsem_wait(&pVnode->syncSem); + } else { + taosWUnLockLatch(&pVnode->lock); + } } } static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { if (vnodeIsMsgBlock(pMsg->msgType)) { const STraceId *trace = &pMsg->info.traceId; - if (pVnode->blockCount) { + taosWLockLatch(&pVnode->lock); + if (pVnode->blocked) { vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); - pVnode->blockCount = 0; + pVnode->blocked = false; tsem_post(&pVnode->syncSem); } + taosWUnLockLatch(&pVnode->lock); } } @@ -677,6 +685,12 @@ static void vnodeBecomeFollower(struct SSyncFSM *pFsm) { vDebug("vgId:%d, become follower", pVnode->config.vgId); // clear old leader resource + taosWLockLatch(&pVnode->lock); + if (pVnode->blocked) { + pVnode->blocked = false; + tsem_post(&pVnode->syncSem); + } + taosWUnLockLatch(&pVnode->lock); } static void vnodeBecomeLeader(struct SSyncFSM *pFsm) { diff --git a/tests/script/tsim/show/basic.sim b/tests/script/tsim/show/basic.sim index 162e74ea14..274476e17c 100644 --- a/tests/script/tsim/show/basic.sim +++ b/tests/script/tsim/show/basic.sim @@ -210,6 +210,8 @@ if $rows != 3 then return -1 endi +sql_error select * from performance_schema.PERF_OFFSETS; + sql show create stable stb; if $rows != 1 then return -1