From f623837b60e31b7ca86ee227a5b4a3a6156d1134 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 14 Dec 2022 09:14:54 +0800 Subject: [PATCH] refact: adjust mnode timer --- source/dnode/mnode/impl/inc/mndInt.h | 1 + source/dnode/mnode/impl/src/mndMain.c | 3 +++ source/dnode/mnode/impl/src/mndSync.c | 28 +++++++++++++++------------ 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index 526fa142eb..785ecc2bf5 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -89,6 +89,7 @@ typedef struct { int32_t errCode; int32_t transId; int32_t transSec; + int64_t transSeq; TdThreadMutex lock; int8_t selfIndex; int8_t numOfReplicas; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index e12a908d88..1a92f94f5e 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -280,6 +280,9 @@ static void *mndThreadFp(void *param) { if (sec % (tsStatusInterval * 5) == 0) { mndCheckDnodeOffline(pMnode); + } + + if (sec % (MNODE_TIMEOUT_SEC / 2) == 0) { mndSyncCheckTimeout(pMnode); } } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index ba14f526a9..6295cfbf55 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -17,8 +17,6 @@ #include "mndSync.h" #include "mndTrans.h" -#define TRANS_TIMEOUT_SEC 10 - static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { if (pMsg == NULL || pMsg->pCont == NULL) { return -1; @@ -103,6 +101,7 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta } pMgmt->transId = 0; pMgmt->transSec = 0; + pMgmt->transSeq = 0; tsem_post(&pMgmt->syncSem); taosThreadMutexUnlock(&pMgmt->lock); } else { @@ -210,6 +209,7 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) { mInfo("vgId:1, become follower and post sem, trans:%d, failed to propose since not leader", pMgmt->transId); pMgmt->transId = 0; pMgmt->transSec = 0; + pMgmt->transSeq = 0; pMgmt->errCode = TSDB_CODE_SYN_NOT_LEADER; tsem_post(&pMgmt->syncSem); } @@ -272,6 +272,7 @@ int32_t mndInitSync(SMnode *pMnode) { taosThreadMutexInit(&pMgmt->lock, NULL); pMgmt->transId = 0; pMgmt->transSec = 0; + pMgmt->transSeq = 0; SSyncInfo syncInfo = { .snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT, @@ -328,17 +329,18 @@ void mndSyncCheckTimeout(SMnode *pMnode) { if (pMgmt->transId != 0) { int32_t curSec = taosGetTimestampSec(); int32_t delta = curSec - pMgmt->transSec; - if (delta > TRANS_TIMEOUT_SEC) { - mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d", pMgmt->transId, pMgmt->transSec, - curSec, delta); + if (delta > MNODE_TIMEOUT_SEC) { + mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId, + pMgmt->transSec, curSec, delta, pMgmt->transSeq); pMgmt->transId = 0; pMgmt->transSec = 0; + pMgmt->transSeq = 0; terrno = TSDB_CODE_SYN_TIMEOUT; pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT; tsem_post(&pMgmt->syncSem); } else { - mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d", pMgmt->transId, pMgmt->transSec, curSec, - curSec - pMgmt->transSec); + mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId, + pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq); } } else { // mTrace("check sync timeout msg, no trans waiting for confirm"); @@ -368,26 +370,28 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { mInfo("trans:%d, will be proposed", transId); pMgmt->transId = transId; pMgmt->transSec = taosGetTimestampSec(); - taosThreadMutexUnlock(&pMgmt->lock); - int32_t code = syncPropose(pMgmt->sync, &req, false); + int64_t seq = 0; + int32_t code = syncPropose(pMgmt->sync, &req, false, &seq); if (code == 0) { - mInfo("trans:%d, is proposing and wait sem", transId); + mInfo("trans:%d, is proposing and wait sem, seq:%" PRId64, transId, seq); + pMgmt->transSeq = seq; + taosThreadMutexUnlock(&pMgmt->lock); tsem_wait(&pMgmt->syncSem); } else if (code > 0) { mInfo("trans:%d, confirm at once since replica is 1, continue execute", transId); - taosThreadMutexLock(&pMgmt->lock); pMgmt->transId = 0; pMgmt->transSec = 0; + pMgmt->transSeq = 0; taosThreadMutexUnlock(&pMgmt->lock); sdbWriteWithoutFree(pMnode->pSdb, pRaw); sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID); code = 0; } else { mError("trans:%d, failed to proposed since %s", transId, terrstr()); - taosThreadMutexLock(&pMgmt->lock); pMgmt->transId = 0; pMgmt->transSec = 0; + pMgmt->transSeq = 0; taosThreadMutexUnlock(&pMgmt->lock); if (terrno == 0) { terrno = TSDB_CODE_APP_ERROR;