diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 1a92f94f5e..117a9f5e67 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -101,6 +101,7 @@ static void *mndBuildCheckpointTickMsg(int32_t *pContLen, int64_t sec) { } static void mndPullupTrans(SMnode *pMnode) { + mTrace("pullup trans msg"); int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { @@ -110,6 +111,7 @@ static void mndPullupTrans(SMnode *pMnode) { } static void mndPullupTtl(SMnode *pMnode) { + mTrace("pullup ttl"); int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen}; @@ -117,6 +119,7 @@ static void mndPullupTtl(SMnode *pMnode) { } static void mndCalMqRebalance(SMnode *pMnode) { + mTrace("calc mq rebalance"); int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { @@ -143,6 +146,7 @@ static void mndStreamCheckpointTick(SMnode *pMnode, int64_t sec) { } static void mndPullupTelem(SMnode *pMnode) { + mTrace("pullup telem msg"); int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { @@ -152,6 +156,7 @@ static void mndPullupTelem(SMnode *pMnode) { } static void mndPullupGrant(SMnode *pMnode) { + mTrace("pullup grant msg"); int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { @@ -162,6 +167,7 @@ static void mndPullupGrant(SMnode *pMnode) { } static void mndIncreaseUpTime(SMnode *pMnode) { + mTrace("increate uptime"); int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); if (pReq != NULL) { @@ -213,6 +219,7 @@ static void mndSetVgroupOffline(SMnode *pMnode, int32_t dnodeId, int64_t curMs) } static void mndCheckDnodeOffline(SMnode *pMnode) { + mTrace("check dnode offline"); if (mndAcquireRpc(pMnode) != 0) return; SSdb *pSdb = pMnode->pSdb; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 6295cfbf55..c96faddc4c 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -78,26 +78,27 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta SSdbRaw *pRaw = pMsg->pCont; int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw); - pMgmt->errCode = pMeta->code; mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 - " role:%s raw:%p", + " role:%s raw:%p sec:%d seq:%" PRId64, transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state), - pRaw); + pRaw, pMgmt->transSec, pMgmt->transSeq); - if (pMgmt->errCode == 0) { + if (pMeta->code == 0) { sdbWriteWithoutFree(pMnode->pSdb, pRaw); sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); } taosThreadMutexLock(&pMgmt->lock); + pMgmt->errCode = pMeta->code; + if (transId <= 0) { taosThreadMutexUnlock(&pMgmt->lock); - mError("trans:%d, invalid commit msg", transId); + mError("trans:%d, invalid commit msg, cache transId:%d seq:%" PRId64, transId, pMgmt->transId, pMgmt->transSeq); } else if (transId == pMgmt->transId) { if (pMgmt->errCode != 0) { mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode)); } else { - mInfo("trans:%d, is proposed and post sem", transId); + mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq); } pMgmt->transId = 0; pMgmt->transSec = 0; @@ -108,7 +109,7 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta taosThreadMutexUnlock(&pMgmt->lock); STrans *pTrans = mndAcquireTrans(pMnode, transId); if (pTrans != NULL) { - mInfo("trans:%d, execute in mnode which not leader", transId); + mInfo("trans:%d, execute in mnode which not leader or sync timeout", transId); mndTransExecute(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans); // sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA); @@ -324,6 +325,7 @@ void mndCleanupSync(SMnode *pMnode) { } void mndSyncCheckTimeout(SMnode *pMnode) { + mTrace("check sync timeout"); SSyncMgmt *pMgmt = &pMnode->syncMgmt; taosThreadMutexLock(&pMgmt->lock); if (pMgmt->transId != 0) { @@ -350,7 +352,6 @@ void mndSyncCheckTimeout(SMnode *pMnode) { int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - pMgmt->errCode = 0; SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; if (req.contLen <= 0) return -1; @@ -360,6 +361,8 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { memcpy(req.pCont, pRaw, req.contLen); taosThreadMutexLock(&pMgmt->lock); + pMgmt->errCode = 0; + if (pMgmt->transId != 0) { mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); taosThreadMutexUnlock(&pMgmt->lock); @@ -406,7 +409,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { } terrno = pMgmt->errCode; - return pMgmt->errCode; + return terrno; } void mndSyncStart(SMnode *pMnode) {