From 917d1165a779bc5adc545dba91b9386242a9dd37 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 6 Feb 2023 22:49:13 +0800 Subject: [PATCH 1/3] fix: set transId to 0 while timeout or in follower state --- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 2 +- source/dnode/mnode/impl/src/mndSync.c | 8 ++++++-- source/dnode/mnode/impl/src/mndTrans.c | 12 +++++++++--- 3 files changed, 16 insertions(+), 6 deletions(-) diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 095857825d..b0810d528f 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -49,7 +49,7 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { pMsg->info.node = pMgmt->pMnode; const STraceId *trace = &pMsg->info.traceId; - dGTrace("msg:%p, get from mnode queue", pMsg); + dGTrace("msg:%p, get from mnode queue, type:%s", pMsg, TMSG_INFO(pMsg->msgType)); int32_t code = mndProcessRpcMsg(pMsg); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index d458ffaed4..387a141c49 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -114,7 +114,11 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta taosThreadMutexUnlock(&pMgmt->lock); STrans *pTrans = mndAcquireTrans(pMnode, transId); if (pTrans != NULL) { - mInfo("trans:%d, execute in mnode which not leader or sync timeout", transId); + mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d", + transId, pTrans->createdTime, pMgmt->transId); + pMgmt->transId = 0; + pMgmt->transSec = 0; + pMgmt->transSeq = 0; mndTransExecute(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans); // sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA); @@ -372,7 +376,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { taosThreadMutexLock(&pMgmt->lock); pMgmt->errCode = 0; - if (pMgmt->transId != 0) { + if (pMgmt->transId != 0 && pMgmt->transId != transId) { mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); taosThreadMutexUnlock(&pMgmt->lock); rpcFreeCont(req.pCont); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 0b28a6eb43..15c28f8e8f 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -794,7 +794,8 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { mInfo("trans:%d, sync to other mnodes, stage:%s", pTrans->id, mndTransStr(pTrans->stage)); int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id); if (code != 0) { - mError("trans:%d, failed to sync, errno:%s code:%s", pTrans->id, terrstr(), tstrerror(code)); + mError("trans:%d, failed to sync, errno:%s code:%s createTime:%" PRId64 " saved trans:%d", pTrans->id, terrstr(), + tstrerror(code), pTrans->createdTime, pMnode->syncMgmt.transId); sdbFreeRaw(pRaw); return -1; } @@ -1495,7 +1496,11 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) { mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); } - mInfo("trans:%d, execute finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes); + mInfo("trans:%d, execute finished, code:0x%x, failedTimes:%d createTime:%" PRId64, pTrans->id, pTrans->code, + pTrans->failedTimes, pTrans->createdTime); + pMnode->syncMgmt.transId = 0; + pMnode->syncMgmt.transSec = 0; + pMnode->syncMgmt.transSeq = 0; return continueExec; } @@ -1503,7 +1508,8 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans) { bool continueExec = true; while (continueExec) { - mInfo("trans:%d, continue to execute, stage:%s", pTrans->id, mndTransStr(pTrans->stage)); + mInfo("trans:%d, continue to execute, stage:%s createTime:%" PRId64, pTrans->id, mndTransStr(pTrans->stage), + pTrans->createdTime); pTrans->lastExecTime = taosGetTimestampMs(); switch (pTrans->stage) { case TRN_STAGE_PREPARE: From 151dfea99c265409e9cef64f459826ebb67522ca Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 7 Feb 2023 00:07:05 +0800 Subject: [PATCH 2/3] fix: restore some changes --- source/dnode/mnode/impl/src/mndSync.c | 5 +---- source/dnode/mnode/impl/src/mndTrans.c | 8 +++----- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 387a141c49..97ad541c1f 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -116,9 +116,6 @@ int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta if (pTrans != NULL) { mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d", transId, pTrans->createdTime, pMgmt->transId); - pMgmt->transId = 0; - pMgmt->transSec = 0; - pMgmt->transSeq = 0; mndTransExecute(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans); // sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA); @@ -376,7 +373,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { taosThreadMutexLock(&pMgmt->lock); pMgmt->errCode = 0; - if (pMgmt->transId != 0 && pMgmt->transId != transId) { + if (pMgmt->transId != 0 /* && pMgmt->transId != transId*/) { mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); taosThreadMutexUnlock(&pMgmt->lock); rpcFreeCont(req.pCont); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 15c28f8e8f..f931ca121a 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -791,7 +791,8 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { } (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mInfo("trans:%d, sync to other mnodes, stage:%s", pTrans->id, mndTransStr(pTrans->stage)); + mInfo("trans:%d, sync to other mnodes, stage:%s createTime:%" PRId64, pTrans->id, mndTransStr(pTrans->stage), + pTrans->createdTime); int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id); if (code != 0) { mError("trans:%d, failed to sync, errno:%s code:%s createTime:%" PRId64 " saved trans:%d", pTrans->id, terrstr(), @@ -801,7 +802,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { } sdbFreeRaw(pRaw); - mInfo("trans:%d, sync finished", pTrans->id); + mInfo("trans:%d, sync finished, createTime:%" PRId64, pTrans->id, pTrans->createdTime); return 0; } @@ -1498,9 +1499,6 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) { mInfo("trans:%d, execute finished, code:0x%x, failedTimes:%d createTime:%" PRId64, pTrans->id, pTrans->code, pTrans->failedTimes, pTrans->createdTime); - pMnode->syncMgmt.transId = 0; - pMnode->syncMgmt.transSec = 0; - pMnode->syncMgmt.transSeq = 0; return continueExec; } From 4d20469e2b3cfe92cf7e37e3e227e5277d102503 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 7 Feb 2023 00:12:05 +0800 Subject: [PATCH 3/3] fix: adjust log formats --- source/libs/sync/src/syncCommit.c | 2 +- source/libs/sync/src/syncMain.c | 20 ++++----- source/libs/sync/src/syncPipeline.c | 66 ++++++++++++++-------------- source/libs/sync/src/syncRaftEntry.c | 2 +- source/libs/sync/src/syncUtil.c | 2 +- 5 files changed, 46 insertions(+), 46 deletions(-) diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 67ed1e0701..6d256a735d 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -110,7 +110,7 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) { SyncIndex commitIndex = indexLikely; syncNodeUpdateCommitIndex(ths, commitIndex); - sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state, + sTrace("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index:%" PRId64 "", ths->vgId, ths->state, ths->raftStore.currentTerm, commitIndex); } return ths->commitIndex; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index ea22ac7bb5..803df6b662 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -85,7 +85,7 @@ int64_t syncOpen(SSyncInfo* pSyncInfo) { int32_t syncStart(int64_t rid) { SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { - sError("failed to acquire rid: %" PRId64 " of tsNodeReftId for pSyncNode", rid); + sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid); return -1; } @@ -756,7 +756,7 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) { SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); if (lastVer < commitIndex || firstVer > commitIndex + 1) { if (pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) { - sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer: %" PRId64 ", snapshotVer: %" PRId64, + sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64, pNode->vgId, terrstr(), lastVer, commitIndex); return -1; } @@ -1101,7 +1101,7 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) { SyncIndex endIndex = pSyncNode->pLogBuf->endIndex; if (lastVer != -1 && endIndex != lastVer + 1) { terrno = TSDB_CODE_WAL_LOG_INCOMPLETE; - sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex: %" PRId64 ", lastVer: %" PRId64 "", + sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "", pSyncNode->vgId, terrstr(), endIndex - 1, lastVer); return -1; } @@ -1820,7 +1820,7 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); ASSERT(lastIndex >= 0); - sInfo("vgId:%d, become leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64 "", + sInfo("vgId:%d, become leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64 "", pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex); } @@ -1839,7 +1839,7 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER); pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - sInfo("vgId:%d, become candidate from follower. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64, + sInfo("vgId:%d, become candidate from follower. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex); sNTrace(pSyncNode, "follower to candidate"); @@ -1849,7 +1849,7 @@ void syncNodeLeader2Follower(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER); syncNodeBecomeFollower(pSyncNode, "leader to follower"); SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - sInfo("vgId:%d, become follower from leader. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64, + sInfo("vgId:%d, become follower from leader. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex); sNTrace(pSyncNode, "leader to follower"); @@ -1859,7 +1859,7 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); syncNodeBecomeFollower(pSyncNode, "candidate to follower"); SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); - sInfo("vgId:%d, become follower from candidate. term: %" PRId64 ", commit index: %" PRId64 ", last index: %" PRId64, + sInfo("vgId:%d, become follower from candidate. term:%" PRId64 ", commit index:%" PRId64 ", last index:%" PRId64, pSyncNode->vgId, pSyncNode->raftStore.currentTerm, pSyncNode->commitIndex, lastIndex); sNTrace(pSyncNode, "candidate to follower"); @@ -2299,7 +2299,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { // proceed match index, with replicating on needed SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL); - sTrace("vgId:%d, append raft entry. index: %" PRId64 ", term: %" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64 + sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex); @@ -2472,7 +2472,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code); rpcFreeCont(rpcMsgLocalCmd.pCont); } else { - sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->currentTerm); + sTrace("vgId:%d, sync enqueue step-down msg, new-term:%" PRId64, ths->vgId, pSyncMsg->currentTerm); } } } @@ -2538,7 +2538,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); } if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { - sError("vgId:%d, failed to commit raft log since %s. commit index: %" PRId64 "", ths->vgId, terrstr(), + sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64 "", ths->vgId, terrstr(), ths->commitIndex); } } else { diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index bb3bb0d6a4..b3eb5684cf 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -132,16 +132,16 @@ SSyncRaftEntry* syncEntryBuildDummy(SyncTerm term, SyncIndex index, int32_t vgId int32_t syncLogValidateAlignmentOfCommit(SSyncNode* pNode, SyncIndex commitIndex) { SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); if (firstVer > commitIndex + 1) { - sError("vgId:%d, firstVer of WAL log greater than tsdb commit version + 1. firstVer: %" PRId64 - ", tsdb commit version: %" PRId64 "", + sError("vgId:%d, firstVer of WAL log greater than tsdb commit version + 1. firstVer:%" PRId64 + ", tsdb commit version:%" PRId64 "", pNode->vgId, firstVer, commitIndex); return -1; } SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); if (lastVer < commitIndex) { - sError("vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer: %" PRId64 - ", tsdb commit version: %" PRId64 "", + sError("vgId:%d, lastVer of WAL log less than tsdb commit version. lastVer:%" PRId64 + ", tsdb commit version:%" PRId64 "", pNode->vgId, lastVer, commitIndex); return -1; } @@ -293,7 +293,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt bool inBuf = true; if (index <= pBuf->commitIndex) { - sTrace("vgId:%d, already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 + sTrace("vgId:%d, already committed. index:%" PRId64 ", term:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); @@ -306,7 +306,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt } if (index - pBuf->startIndex >= pBuf->size) { - sWarn("vgId:%d, out of buffer range. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 + sWarn("vgId:%d, out of buffer range. index:%" PRId64 ", term:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); @@ -314,8 +314,8 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt } if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) { - sWarn("vgId:%d, not ready to accept. index: %" PRId64 ", term: %" PRId64 ": prevterm: %" PRId64 - " != lastmatch: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + sWarn("vgId:%d, not ready to accept. index:%" PRId64 ", term:%" PRId64 ": prevterm:%" PRId64 + " != lastmatch:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); goto _out; @@ -328,7 +328,7 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt if (pEntry->term != pExist->term) { (void)syncLogBufferRollback(pBuf, pNode, index); } else { - sTrace("vgId:%d, duplicate log entry received. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 + sTrace("vgId:%d, duplicate log entry received. index:%" PRId64 ", term:%" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); @@ -434,7 +434,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p // increase match index pBuf->matchIndex = index; - sTrace("vgId:%d, log buffer proceed. start index: %" PRId64 ", match index: %" PRId64 ", end index: %" PRId64, + sTrace("vgId:%d, log buffer proceed. start index:%" PRId64 ", match index:%" PRId64 ", end index:%" PRId64, pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex); // replicate on demand @@ -475,7 +475,7 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn } if (pEntry->originalRpcType == TDMT_VND_COMMIT) { - sInfo("vgId:%d, fsm execute vnode commit. index: %" PRId64 ", term: %" PRId64 "", pNode->vgId, pEntry->index, + sInfo("vgId:%d, fsm execute vnode commit. index:%" PRId64 ", term:%" PRId64 "", pNode->vgId, pEntry->index, pEntry->term); } @@ -528,7 +528,7 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm goto _out; } - sTrace("vgId:%d, commit. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), role: %d, term: %" PRId64, + sTrace("vgId:%d, commit. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), role:%d, term:%" PRId64, pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, term); // execute in fsm @@ -541,19 +541,19 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm // execute it if (!syncUtilUserCommit(pEntry->originalRpcType)) { - sInfo("vgId:%d, commit sync barrier. index: %" PRId64 ", term:%" PRId64 ", type: %s", vgId, pEntry->index, + sInfo("vgId:%d, commit sync barrier. index:%" PRId64 ", term:%" PRId64 ", type:%s", vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType)); } if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry, 0) != 0) { sError("vgId:%d, failed to execute sync log entry. index:%" PRId64 ", term:%" PRId64 - ", role: %d, current term: %" PRId64, + ", role:%d, current term:%" PRId64, vgId, pEntry->index, pEntry->term, role, term); goto _out; } pBuf->commitIndex = index; - sTrace("vgId:%d, committed index: %" PRId64 ", term: %" PRId64 ", role: %d, current term: %" PRId64 "", pNode->vgId, + sTrace("vgId:%d, committed index:%" PRId64 ", term:%" PRId64 ", role:%d, current term:%" PRId64 "", pNode->vgId, pEntry->index, pEntry->term, role, term); if (!inBuf) { @@ -614,7 +614,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) { syncLogReplMgrReset(pMgr); - sWarn("vgId:%d, reset sync log repl mgr since retry backoff exceeding limit. peer: %" PRIx64, pNode->vgId, + sWarn("vgId:%d, reset sync log repl mgr since retry backoff exceeding limit. peer:%" PRIx64, pNode->vgId, pDestId->addr); return -1; } @@ -639,7 +639,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { if (pMgr->states[pos].acked) { if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) { syncLogReplMgrReset(pMgr); - sWarn("vgId:%d, reset sync log repl mgr since stagnation. index: %" PRId64 ", peer: %" PRIx64, pNode->vgId, + sWarn("vgId:%d, reset sync log repl mgr since stagnation. index:%" PRId64 ", peer:%" PRIx64, pNode->vgId, index, pDestId->addr); goto _out; } @@ -648,7 +648,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { bool barrier = false; if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { - sError("vgId:%d, failed to replicate sync log entry since %s. index: %" PRId64 ", dest: %" PRIx64 "", pNode->vgId, + sError("vgId:%d, failed to replicate sync log entry since %s. index:%" PRId64 ", dest:%" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); goto _out; } @@ -670,8 +670,8 @@ _out: if (retried) { pMgr->retryBackoff = syncLogGetNextRetryBackoff(pMgr); SSyncLogBuffer* pBuf = pNode->pLogBuf; - sInfo("vgId:%d, resend %d sync log entries. dest: %" PRIx64 ", indexes: %" PRId64 " ..., terms: ... %" PRId64 - ", retryWaitMs: %" PRId64 ", mgr: [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 + sInfo("vgId:%d, resend %d sync log entries. dest:%" PRIx64 ", indexes:%" PRId64 " ..., terms: ... %" PRId64 + ", retryWaitMs:%" PRId64 ", mgr: [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); @@ -714,7 +714,7 @@ int32_t syncLogReplMgrProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* p } if (pMsg->success == false && pMsg->matchIndex >= pMsg->lastSendIndex) { - sWarn("vgId:%d, failed to rollback match index. peer: dnode:%d, match index: %" PRId64 ", last sent: %" PRId64, + sWarn("vgId:%d, failed to rollback match index. peer: dnode:%d, match index:%" PRId64 ", last sent:%" PRId64, pNode->vgId, DID(&destId), pMsg->matchIndex, pMsg->lastSendIndex); if (syncNodeStartSnapshot(pNode, &destId) < 0) { sError("vgId:%d, failed to start snapshot for peer dnode:%d", pNode->vgId, DID(&destId)); @@ -761,7 +761,7 @@ int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pN SSyncLogBuffer* pBuf = pNode->pLogBuf; taosThreadMutexLock(&pBuf->mutex); if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) { - sInfo("vgId:%d, reset sync log repl mgr in heartbeat. peer: %" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "", + sInfo("vgId:%d, reset sync log repl mgr in heartbeat. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "", pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime); syncLogReplMgrReset(pMgr); pMgr->peerStartTime = pMsg->startTime; @@ -774,7 +774,7 @@ int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sync SSyncLogBuffer* pBuf = pNode->pLogBuf; taosThreadMutexLock(&pBuf->mutex); if (pMsg->startTime != pMgr->peerStartTime) { - sInfo("vgId:%d, reset sync log repl mgr in appendlog reply. peer: %" PRIx64 ", start time:%" PRId64 + sInfo("vgId:%d, reset sync log repl mgr in appendlog reply. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64, pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime); syncLogReplMgrReset(pMgr); @@ -815,7 +815,7 @@ int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy bool barrier = false; SyncTerm term = -1; if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { - sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, + sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); return -1; } @@ -830,7 +830,7 @@ int32_t syncLogReplMgrReplicateProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy pMgr->endIndex = index + 1; SSyncLogBuffer* pBuf = pNode->pLogBuf; - sTrace("vgId:%d, probe peer:%" PRIx64 " with msg of index:%" PRId64 " term: %" PRId64 ". mgr (rs:%d): [%" PRId64 + sTrace("vgId:%d, probe peer:%" PRIx64 " with msg of index:%" PRId64 " term:%" PRId64 ". mgr (rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pDestId->addr, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); @@ -860,7 +860,7 @@ int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) bool barrier = false; SyncTerm term = -1; if (syncLogReplMgrReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { - sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, + sError("vgId:%d, failed to replicate log entry since %s. index:%" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, pDestId->addr); return -1; } @@ -874,7 +874,7 @@ int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) pMgr->endIndex = index + 1; if (barrier) { - sInfo("vgId:%d, replicated sync barrier to dest: %" PRIx64 ". index: %" PRId64 ", term: %" PRId64 + sInfo("vgId:%d, replicated sync barrier to dest:%" PRIx64 ". index:%" PRId64 ", term:%" PRId64 ", repl mgr: rs(%d) [%" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pDestId->addr, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex); @@ -885,7 +885,7 @@ int32_t syncLogReplMgrReplicateAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) syncLogReplMgrRetryOnNeed(pMgr, pNode); SSyncLogBuffer* pBuf = pNode->pLogBuf; - sTrace("vgId:%d, replicated %d msgs to peer: %" PRIx64 ". indexes: %" PRId64 "..., terms: ...%" PRId64 + sTrace("vgId:%d, replicated %d msgs to peer:%" PRIx64 ". indexes:%" PRId64 "..., terms: ...%" PRId64 ", mgr: (rs:%d) [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, @@ -1028,7 +1028,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex return 0; } - sInfo("vgId:%d, rollback sync log buffer. toindex: %" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64 + sInfo("vgId:%d, rollback sync log buffer. toindex:%" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); @@ -1119,11 +1119,11 @@ int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf); if (pEntry == NULL) { - sError("vgId:%d, failed to get raft entry for index: %" PRId64 "", pNode->vgId, index); + sError("vgId:%d, failed to get raft entry for index:%" PRId64 "", pNode->vgId, index); if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId); if (pMgr) { - sInfo("vgId:%d, reset sync log repl mgr of peer: %" PRIx64 " since %s. index: %" PRId64, pNode->vgId, + sInfo("vgId:%d, reset sync log repl mgr of peer:%" PRIx64 " since %s. index:%" PRId64, pNode->vgId, pDestId->addr, terrstr(), index); (void)syncLogReplMgrReset(pMgr); } @@ -1134,7 +1134,7 @@ int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy prevLogTerm = syncLogReplMgrGetPrevLogTerm(pMgr, pNode, index); if (prevLogTerm < 0) { - sError("vgId:%d, failed to get prev log term since %s. index: %" PRId64 "", pNode->vgId, terrstr(), index); + sError("vgId:%d, failed to get prev log term since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); goto _err; } if (pTerm) *pTerm = pEntry->term; @@ -1147,7 +1147,7 @@ int32_t syncLogReplMgrReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Sy (void)syncNodeSendAppendEntries(pNode, pDestId, &msgOut); - sTrace("vgId:%d, replicate one msg index: %" PRId64 " term: %" PRId64 " prevterm: %" PRId64 " to dest: 0x%016" PRIx64, + sTrace("vgId:%d, replicate one msg index:%" PRId64 " term:%" PRId64 " prevterm:%" PRId64 " to dest: 0x%016" PRIx64, pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr); if (!inBuf) { diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 988a86cc67..623f1b77a4 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -91,7 +91,7 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) void syncEntryDestroy(SSyncRaftEntry* pEntry) { if (pEntry != NULL) { - sTrace("free entry: %p", pEntry); + sTrace("free entry:%p", pEntry); taosMemoryFree(pEntry); } } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 6a50572cba..97641b8f41 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -43,7 +43,7 @@ void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) { bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) { uint32_t ipv4 = taosGetIpv4FromFqdn(pInfo->nodeFqdn); if (ipv4 == 0xFFFFFFFF || ipv4 == 1) { - sError("failed to resolve ipv4 addr, fqdn: %s", pInfo->nodeFqdn); + sError("failed to resolve ipv4 addr, fqdn:%s", pInfo->nodeFqdn); terrno = TSDB_CODE_TSC_INVALID_FQDN; return false; }