From f137e8395da5a7a07ca21a4c572150b24326afd5 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 13 Dec 2022 10:18:00 +0800 Subject: [PATCH 1/3] refact: change SRWLatch to TdThreadMutex in mnode --- source/dnode/mnode/impl/inc/mndInt.h | 16 ++++---- source/dnode/mnode/impl/src/mndSync.c | 59 ++++++++++++++------------- 2 files changed, 39 insertions(+), 36 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index a0f3c98f83..aa18b409f4 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -84,14 +84,14 @@ typedef struct { } STelemMgmt; typedef struct { - tsem_t syncSem; - int64_t sync; - int32_t errCode; - int32_t transId; - SRWLatch lock; - int8_t selfIndex; - int8_t numOfReplicas; - SReplica replicas[TSDB_MAX_REPLICA]; + tsem_t syncSem; + int64_t sync; + int32_t errCode; + int32_t transId; + TdThreadMutex lock; + int8_t selfIndex; + int8_t numOfReplicas; + SReplica replicas[TSDB_MAX_REPLICA]; } SSyncMgmt; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 10b1e36496..3747811e89 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -89,9 +89,9 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); } - taosWLockLatch(&pMgmt->lock); + taosThreadMutexLock(&pMgmt->lock); if (transId <= 0) { - taosWUnLockLatch(&pMgmt->lock); + taosThreadMutexUnlock(&pMgmt->lock); mError("trans:%d, invalid commit msg", transId); } else if (transId == pMgmt->transId) { if (pMgmt->errCode != 0) { @@ -101,9 +101,9 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta } pMgmt->transId = 0; tsem_post(&pMgmt->syncSem); - taosWUnLockLatch(&pMgmt->lock); + taosThreadMutexUnlock(&pMgmt->lock); } else { - taosWUnLockLatch(&pMgmt->lock); + taosThreadMutexUnlock(&pMgmt->lock); STrans *pTrans = mndAcquireTrans(pMnode, transId); if (pTrans != NULL) { mInfo("trans:%d, execute in mnode which not leader", transId); @@ -198,18 +198,18 @@ int32_t mndSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int3 } static void mndBecomeFollower(const SSyncFSM *pFsm) { - SMnode *pMnode = pFsm->data; + SMnode *pMnode = pFsm->data; + SSyncMgmt *pMgmt = &pMnode->syncMgmt; mInfo("vgId:1, become follower"); - taosWLockLatch(&pMnode->syncMgmt.lock); - if (pMnode->syncMgmt.transId != 0) { - mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", - pMnode->syncMgmt.transId); - pMnode->syncMgmt.transId = 0; - pMnode->syncMgmt.errCode = TSDB_CODE_SYN_NOT_LEADER; - tsem_post(&pMnode->syncMgmt.syncSem); + taosThreadMutexLock(&pMgmt->lock); + if (pMgmt->transId != 0) { + mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId); + pMgmt->transId = 0; + pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER; + tsem_post(&pMgmt->syncSem); } - taosWUnLockLatch(&pMnode->syncMgmt.lock); + taosThreadMutexUnlock(&pMgmt->lock); } static void mndBecomeLeader(const SSyncFSM *pFsm) { @@ -265,7 +265,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { int32_t mndInitSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - taosInitRWLatch(&pMgmt->lock); + taosThreadMutexInit(&pMgmt->lock, NULL); pMgmt->transId = 0; SSyncInfo syncInfo = { @@ -313,6 +313,7 @@ void mndCleanupSync(SMnode *pMnode) { mInfo("mnode-sync is stopped, id:%" PRId64, pMgmt->sync); tsem_destroy(&pMgmt->syncSem); + taosThreadMutexDestroy(&pMgmt->lock); memset(pMgmt, 0, sizeof(SSyncMgmt)); } @@ -327,35 +328,35 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { if (req.pCont == NULL) return -1; memcpy(req.pCont, pRaw, req.contLen); - taosWLockLatch(&pMgmt->lock); + taosThreadMutexLock(&pMgmt->lock); if (pMgmt->transId != 0) { mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); - taosWUnLockLatch(&pMgmt->lock); + taosThreadMutexUnlock(&pMgmt->lock); terrno = TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED; return -1; } mInfo("trans:%d, will be proposed", transId); pMgmt->transId = transId; - taosWUnLockLatch(&pMgmt->lock); + taosThreadMutexUnlock(&pMgmt->lock); int32_t code = syncPropose(pMgmt->sync, &req, false); if (code == 0) { - mInfo("trans:%d, is proposing and wait sem", pMgmt->transId); + mInfo("trans:%d, is proposing and wait sem", transId); tsem_wait(&pMgmt->syncSem); } else if (code > 0) { mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId); - taosWLockLatch(&pMgmt->lock); + taosThreadMutexLock(&pMgmt->lock); pMgmt->transId = 0; - taosWUnLockLatch(&pMgmt->lock); + taosThreadMutexUnlock(&pMgmt->lock); sdbWriteWithoutFree(pMnode->pSdb, pRaw); sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID); code = 0; } else { mError("trans:%d, failed to proposed since %s", transId, terrstr()); - taosWLockLatch(&pMgmt->lock); + taosThreadMutexLock(&pMgmt->lock); pMgmt->transId = 0; - taosWUnLockLatch(&pMgmt->lock); + taosThreadMutexUnlock(&pMgmt->lock); if (terrno == 0) { terrno = TSDB_CODE_APP_ERROR; } @@ -382,13 +383,15 @@ void mndSyncStart(SMnode *pMnode) { } void mndSyncStop(SMnode *pMnode) { - taosWLockLatch(&pMnode->syncMgmt.lock); - if (pMnode->syncMgmt.transId != 0) { - mInfo("vgId:1, is stopped and post sem, trans:%d", pMnode->syncMgmt.transId); - pMnode->syncMgmt.transId = 0; - tsem_post(&pMnode->syncMgmt.syncSem); + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + + taosThreadMutexLock(&pMgmt->lock); + if (pMgmt->transId != 0) { + mInfo("vgId:1, is stopped and post sem, trans:%d", pMgmt->transId); + pMgmt->transId = 0; + tsem_post(&pMgmt->syncSem); } - taosWUnLockLatch(&pMnode->syncMgmt.lock); + taosThreadMutexUnlock(&pMgmt->lock); } bool mndIsLeader(SMnode *pMnode) { From 9da398913e91628c22bb11dda0f9b6fac13094c2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 13 Dec 2022 11:11:22 +0800 Subject: [PATCH 2/3] refact: post sem in mnode while sync timeout --- source/dnode/mnode/impl/inc/mndInt.h | 1 + source/dnode/mnode/impl/inc/mndSync.h | 1 + source/dnode/mnode/impl/src/mndMain.c | 1 + source/dnode/mnode/impl/src/mndSync.c | 35 ++++++++++++++++++++++++++- 4 files changed, 37 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index aa18b409f4..526fa142eb 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -88,6 +88,7 @@ typedef struct { int64_t sync; int32_t errCode; int32_t transId; + int32_t transSec; TdThreadMutex lock; int8_t selfIndex; int8_t numOfReplicas; diff --git a/source/dnode/mnode/impl/inc/mndSync.h b/source/dnode/mnode/impl/inc/mndSync.h index 993efbcd08..1f9a698a8a 100644 --- a/source/dnode/mnode/impl/inc/mndSync.h +++ b/source/dnode/mnode/impl/inc/mndSync.h @@ -26,6 +26,7 @@ int32_t mndInitSync(SMnode *pMnode); void mndCleanupSync(SMnode *pMnode); bool mndIsLeader(SMnode *pMnode); int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId); +void mndSyncCheckTimeout(SMnode *pMnode); void mndSyncStart(SMnode *pMnode); void mndSyncStop(SMnode *pMnode); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 237d45bc38..e12a908d88 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -280,6 +280,7 @@ static void *mndThreadFp(void *param) { if (sec % (tsStatusInterval * 5) == 0) { mndCheckDnodeOffline(pMnode); + mndSyncCheckTimeout(pMnode); } } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 3747811e89..a1805c3289 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,6 +17,8 @@ #include "mndSync.h" #include "mndTrans.h" +#define TRANS_TIMEOUT_SEC 10 + static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { if (pMsg == NULL || pMsg->pCont == NULL) { return -1; @@ -100,6 +102,7 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta mInfo("trans:%d, is proposed and post sem", transId); } pMgmt->transId = 0; + pMgmt->transSec = 0; tsem_post(&pMgmt->syncSem); taosThreadMutexUnlock(&pMgmt->lock); } else { @@ -206,6 +209,7 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) { if (pMgmt->transId != 0) { mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId); pMgmt->transId = 0; + pMgmt->transSec = 0; pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER; tsem_post(&pMgmt->syncSem); } @@ -267,6 +271,7 @@ int32_t mndInitSync(SMnode *pMnode) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; taosThreadMutexInit(&pMgmt->lock, NULL); pMgmt->transId = 0; + pMgmt->transSec = 0; SSyncInfo syncInfo = { .snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT, @@ -317,6 +322,30 @@ void mndCleanupSync(SMnode *pMnode) { memset(pMgmt, 0, sizeof(SSyncMgmt)); } +void mndSyncCheckTimeout(SMnode *pMnode) { + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + taosThreadMutexLock(&pMgmt->lock); + if (pMgmt->transId != 0) { + int32_t curSec = taosGetTimestampSec(); + int32_t delta = curSec - pMgmt->transSec; + if (delta > TRANS_TIMEOUT_SEC) { + mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d", pMgmt->transId, pMgmt->transSec, + curSec, delta); + pMgmt->transId = 0; + pMgmt->transSec = 0; + terrno = TSDB_CODE_SYN_TIMEOUT; + pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT; + tsem_post(&pMgmt->syncSem); + } else { + mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d", pMgmt->transId, pMgmt->transSec, curSec, + curSec - pMgmt->transSec); + } + } else { + mTrace("check sync timeout msg, no trans waiting for confirm"); + } + taosThreadMutexUnlock(&pMgmt->lock); +} + int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; pMgmt->errCode = 0; @@ -333,11 +362,12 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); taosThreadMutexUnlock(&pMgmt->lock); terrno = TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED; - return -1; + return terrno; } mInfo("trans:%d, will be proposed", transId); pMgmt->transId = transId; + pMgmt->transSec = taosGetTimestampSec(); taosThreadMutexUnlock(&pMgmt->lock); int32_t code = syncPropose(pMgmt->sync, &req, false); @@ -348,6 +378,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId); taosThreadMutexLock(&pMgmt->lock); pMgmt->transId = 0; + pMgmt->transSec = 0; taosThreadMutexUnlock(&pMgmt->lock); sdbWriteWithoutFree(pMnode->pSdb, pRaw); sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID); @@ -356,6 +387,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { mError("trans:%d, failed to proposed since %s", transId, terrstr()); taosThreadMutexLock(&pMgmt->lock); pMgmt->transId = 0; + pMgmt->transSec = 0; taosThreadMutexUnlock(&pMgmt->lock); if (terrno == 0) { terrno = TSDB_CODE_APP_ERROR; @@ -389,6 +421,7 @@ void mndSyncStop(SMnode *pMnode) { if (pMgmt->transId != 0) { mInfo("vgId:1, is stopped and post sem, trans:%d", pMgmt->transId); pMgmt->transId = 0; + pMgmt->transSec = 0; tsem_post(&pMgmt->syncSem); } taosThreadMutexUnlock(&pMgmt->lock); From e7646660058da0d9c317ecd4b4bf37753860b807 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 13 Dec 2022 11:28:48 +0800 Subject: [PATCH 3/3] fix: return errorcode if sync stop --- source/dnode/mnode/impl/src/mndSync.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index a1805c3289..ba14f526a9 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -341,7 +341,7 @@ void mndSyncCheckTimeout(SMnode *pMnode) { curSec - pMgmt->transSec); } } else { - mTrace("check sync timeout msg, no trans waiting for confirm"); + // mTrace("check sync timeout msg, no trans waiting for confirm"); } taosThreadMutexUnlock(&pMgmt->lock); } @@ -422,6 +422,7 @@ void mndSyncStop(SMnode *pMnode) { mInfo("vgId:1, is stopped and post sem, trans:%d", pMgmt->transId); pMgmt->transId = 0; pMgmt->transSec = 0; + pMgmt->errCode = TSDB_CODE_APP_IS_STOPPING; tsem_post(&pMgmt->syncSem); } taosThreadMutexUnlock(&pMgmt->lock);