From 0a96294056997f6fa52e46e87c21de0c89ef5cd0 Mon Sep 17 00:00:00 2001 From: Simon Guan Date: Wed, 19 Mar 2025 21:44:39 +0800 Subject: [PATCH] fix: adjust raft logs (#30258) --- source/dnode/vnode/src/meta/metaEntry.c | 4 +- source/dnode/vnode/src/meta/metaEntry2.c | 4 +- source/dnode/vnode/src/meta/metaTable2.c | 16 ++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- source/dnode/vnode/src/vnd/vnodeSync.c | 23 +++-- source/libs/sync/inc/syncCommit.h | 2 +- source/libs/sync/inc/syncPipeline.h | 6 +- source/libs/sync/inc/syncUtil.h | 2 +- source/libs/sync/src/syncAppendEntries.c | 21 +++-- source/libs/sync/src/syncAppendEntriesReply.c | 8 +- source/libs/sync/src/syncCommit.c | 6 +- source/libs/sync/src/syncMain.c | 69 +++++++------- source/libs/sync/src/syncPipeline.c | 90 +++++++++++-------- source/libs/sync/src/syncRaftLog.c | 6 +- source/libs/sync/src/syncUtil.c | 2 +- 15 files changed, 146 insertions(+), 115 deletions(-) diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index d21fda5140..dfa0ea13bd 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -48,7 +48,7 @@ int meteEncodeColRefEntry(SEncoder *pCoder, const SMetaEntry *pME) { const SColRefWrapper *pw = &pME->colRef; TAOS_CHECK_RETURN(tEncodeI32v(pCoder, pw->nCols)); TAOS_CHECK_RETURN(tEncodeI32v(pCoder, pw->version)); - uDebug("encode cols:%d", pw->nCols); + uTrace("encode cols:%d", pw->nCols); for (int32_t i = 0; i < pw->nCols; i++) { SColRef *p = &pw->pColRef[i]; @@ -171,7 +171,7 @@ int meteEncodeColCmprEntry(SEncoder *pCoder, const SMetaEntry *pME) { const SColCmprWrapper *pw = &pME->colCmpr; TAOS_CHECK_RETURN(tEncodeI32v(pCoder, pw->nCols)); TAOS_CHECK_RETURN(tEncodeI32v(pCoder, pw->version)); - uDebug("encode cols:%d", pw->nCols); + uTrace("encode cols:%d", pw->nCols); for (int32_t i = 0; i < pw->nCols; i++) { SColCmpr *p = &pw->pColCmpr[i]; diff --git a/source/dnode/vnode/src/meta/metaEntry2.c b/source/dnode/vnode/src/meta/metaEntry2.c index c95c1724a2..2475ce16e8 100644 --- a/source/dnode/vnode/src/meta/metaEntry2.c +++ b/source/dnode/vnode/src/meta/metaEntry2.c @@ -2515,8 +2515,8 @@ int32_t metaHandleEntry2(SMeta *pMeta, const SMetaEntry *pEntry) { if (TSDB_CODE_SUCCESS == code) { pMeta->changed = true; - metaDebug("vgId:%d, %s success, version:%" PRId64 " type:%d uid:%" PRId64 " name:%s", vgId, __func__, - pEntry->version, pEntry->type, pEntry->uid, pEntry->type > 0 ? pEntry->name : ""); + metaDebug("vgId:%d, index:%" PRId64 ", handle meta entry success, type:%d tb:%s uid:%" PRId64, vgId, pEntry->version, + pEntry->type, pEntry->type > 0 ? pEntry->name : "", pEntry->uid); } else { metaErr(vgId, code); } diff --git a/source/dnode/vnode/src/meta/metaTable2.c b/source/dnode/vnode/src/meta/metaTable2.c index a393c8eea9..abdd8bc023 100644 --- a/source/dnode/vnode/src/meta/metaTable2.c +++ b/source/dnode/vnode/src/meta/metaTable2.c @@ -381,8 +381,8 @@ static int32_t metaCreateChildTable(SMeta *pMeta, int64_t version, SVCreateTbReq // handle entry code = metaHandleEntry2(pMeta, &entry); if (TSDB_CODE_SUCCESS == code) { - metaInfo("vgId:%d, child table:%s uid %" PRId64 " suid:%" PRId64 " is created, version:%" PRId64, - TD_VID(pMeta->pVnode), pReq->name, pReq->uid, pReq->ctb.suid, version); + metaInfo("vgId:%d, index:%" PRId64 ", child table is created, tb:%s uid:%" PRId64 " suid:%" PRId64, + TD_VID(pMeta->pVnode), version, pReq->name, pReq->uid, pReq->ctb.suid); } else { metaError("vgId:%d, %s failed at %s:%d since %s, uid:%" PRId64 " name:%s suid:%" PRId64 " version:%" PRId64, TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, tstrerror(code), pReq->uid, pReq->name, @@ -493,8 +493,8 @@ static int32_t metaCreateNormalTable(SMeta *pMeta, int64_t version, SVCreateTbRe // handle entry code = metaHandleEntry2(pMeta, &entry); if (TSDB_CODE_SUCCESS == code) { - metaInfo("vgId:%d, normal table:%s uid %" PRId64 " is created, version:%" PRId64, TD_VID(pMeta->pVnode), pReq->name, - pReq->uid, version); + metaInfo("vgId:%d, index:%" PRId64 ", normal table is created, tb:%s uid:%" PRId64, TD_VID(pMeta->pVnode), version, + pReq->name, pReq->uid); } else { metaError("vgId:%d, %s failed at %s:%d since %s, uid:%" PRId64 " name:%s version:%" PRId64, TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, tstrerror(code), pReq->uid, pReq->name, version); @@ -557,8 +557,8 @@ static int32_t metaCreateVirtualNormalTable(SMeta *pMeta, int64_t version, SVCre // handle entry code = metaHandleEntry2(pMeta, &entry); if (TSDB_CODE_SUCCESS == code) { - metaInfo("vgId:%d, normal table:%s uid %" PRId64 " is created, version:%" PRId64, TD_VID(pMeta->pVnode), pReq->name, - pReq->uid, version); + metaInfo("vgId:%d, index:%" PRId64 ", virtual normal table is created, tb:%s uid:%" PRId64, TD_VID(pMeta->pVnode), + version, pReq->name, pReq->uid); } else { metaError("vgId:%d, %s failed at %s:%d since %s, uid:%" PRId64 " name:%s version:%" PRId64, TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, tstrerror(code), pReq->uid, pReq->name, version); @@ -625,8 +625,8 @@ static int32_t metaCreateVirtualChildTable(SMeta *pMeta, int64_t version, SVCrea // handle entry code = metaHandleEntry2(pMeta, &entry); if (TSDB_CODE_SUCCESS == code) { - metaInfo("vgId:%d, normal table:%s uid %" PRId64 " is created, version:%" PRId64, TD_VID(pMeta->pVnode), pReq->name, - pReq->uid, version); + metaInfo("vgId:%d, index:%" PRId64 ", virtual child table is created, tb:%s uid:%" PRId64, TD_VID(pMeta->pVnode), + version, pReq->name, pReq->uid); } else { metaError("vgId:%d, %s failed at %s:%d since %s, uid:%" PRId64 " name:%s version:%" PRId64, TD_VID(pMeta->pVnode), __func__, __FILE__, __LINE__, tstrerror(code), pReq->uid, pReq->name, version); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 3563b71762..f1017a3c1a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1354,7 +1354,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, } } - vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids)); + vTrace("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids)); if (tqUpdateTbUidList(pVnode->pTq, tbUids, true) < 0) { vError("vgId:%d, failed to update tbUid list since %s", TD_VID(pVnode), tstrerror(terrno)); } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 1df6558879..a6a024631a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -335,12 +335,12 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; if (vnodeIsMsgBlock(pMsg->msgType)) { - vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-apply queue, type:%s handle:%p index:%" PRId64 + vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-apply queue, type:%s handle:%p index:%" PRId64 ", blocking msg obtained sec:%d seq:%" PRId64, vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex, pVnode->blockSec, pVnode->blockSeq); } else { - vGTrace(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg, + vGDebug(&pMsg->info.traceId, "vgId:%d, msg:%p, get from vnode-apply queue, type:%s handle:%p index:%" PRId64, vgId, pMsg, TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex); } @@ -437,10 +437,10 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm pMsg->info.conn.applyIndex = pMeta->index; pMsg->info.conn.applyTerm = pMeta->term; - vGTrace(&pMsg->info.traceId, - "vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 + vGDebug(&pMsg->info.traceId, + "vgId:%d, index:%" PRId64 ", execute commit cb, fsm:%p, term:%" PRIu64 ", msg-index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s code:0x%x", - pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code, + pVnode->config.vgId, pMeta->index, pFsm, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code); int32_t code = tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg); @@ -456,7 +456,11 @@ static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMet SVnode *pVnode = pFsm->data; vnodePostBlockMsg(pVnode, pMsg); - SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; + SRpcMsg rsp = { + .code = pMsg->code, + .info = pMsg->info, + }; + if (rsp.info.handle != NULL) { tmsgSendRsp(&rsp); } @@ -482,9 +486,10 @@ static SyncIndex vnodeSyncAppliedIndex(const SSyncFSM *pFSM) { static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta *pMeta) { SVnode *pVnode = pFsm->data; - vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", - pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state), - TMSG_INFO(pMsg->msgType)); + vGDebug(&pMsg->info.traceId, + "vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", + pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state), + TMSG_INFO(pMsg->msgType)); } static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) { diff --git a/source/libs/sync/inc/syncCommit.h b/source/libs/sync/inc/syncCommit.h index e6b54ea86b..d78ac81a25 100644 --- a/source/libs/sync/inc/syncCommit.h +++ b/source/libs/sync/inc/syncCommit.h @@ -51,7 +51,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode); bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index); int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex); -int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely); +int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely, const STraceId *trace); int64_t syncNodeUpdateAssignedCommitIndex(SSyncNode* ths, SyncIndex assignedCommitIndex); diff --git a/source/libs/sync/inc/syncPipeline.h b/source/libs/sync/inc/syncPipeline.h index f96eb7fb88..46a568596f 100644 --- a/source/libs/sync/inc/syncPipeline.h +++ b/source/libs/sync/inc/syncPipeline.h @@ -108,8 +108,10 @@ bool syncLogBufferIsEmpty(SSyncLogBuffer* pBuf); int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry); int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm); -int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char *str, const SRpcMsg *pMsg); -int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex); +int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm, char* str, + const SRpcMsg* pMsg); +int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex, const STraceId* trace, + const char* src); int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode); // private diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 418b04a3cd..5c0a045fd2 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -36,7 +36,7 @@ extern "C" { #define sGFatal(trace, param, ...) do { if (sDebugFlag & DEBUG_FATAL) { sFatal(param ", QID:0x%" PRIx64 ":0x%" PRIx64, __VA_ARGS__, (trace) ? (trace)->rootId : 0, (trace) ? (trace)->msgId : 0);}} while(0) #define sGError(trace, param, ...) do { if (sDebugFlag & DEBUG_ERROR) { sError(param ", QID:0x%" PRIx64 ":0x%" PRIx64, __VA_ARGS__, (trace) ? (trace)->rootId : 0, (trace) ? (trace)->msgId : 0);}} while(0) #define sGWarn(trace, param, ...) do { if (sDebugFlag & DEBUG_WARN) { sWarn(param ", QID:0x%" PRIx64 ":0x%" PRIx64, __VA_ARGS__, (trace) ? (trace)->rootId : 0, (trace) ? (trace)->msgId : 0);}} while(0) -#define sGInfo(ptrace, aram, ...) do { if (sDebugFlag & DEBUG_INFO) { sInfo(param ", QID:0x%" PRIx64 ":0x%" PRIx64, __VA_ARGS__, (trace) ? (trace)->rootId : 0, (trace) ? (trace)->msgId : 0);}} while(0) +#define sGInfo(trace, param, ...) do { if (sDebugFlag & DEBUG_INFO) { sInfo(param ", QID:0x%" PRIx64 ":0x%" PRIx64, __VA_ARGS__, (trace) ? (trace)->rootId : 0, (trace) ? (trace)->msgId : 0);}} while(0) #define sGDebug(trace, param, ...) do { if (sDebugFlag & DEBUG_DEBUG) { sDebug(param ", QID:0x%" PRIx64 ":0x%" PRIx64, __VA_ARGS__, (trace) ? (trace)->rootId : 0, (trace) ? (trace)->msgId : 0);}} while(0) #define sLFatal(...) if (sDebugFlag & DEBUG_FATAL) { taosPrintLongString("SYN FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 8afface863..73c7ee03c1 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -112,6 +112,7 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncLogRecvAppendEntries(ths, pMsg, "build rsp error", &pRpcMsg->info.traceId); goto _IGNORE; } + rpcRsp.info.traceId = pRpcMsg->info.traceId; SyncAppendEntriesReply* pReply = rpcRsp.pCont; // prepare response msg @@ -155,7 +156,8 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { goto _IGNORE; } - sGTrace(&pRpcMsg->info.traceId, "vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64 + sGDebug(&pRpcMsg->info.traceId, + "vgId:%d, index:%" PRId64 ", recv append entries msg, term:%" PRId64 ", preLogIndex:%" PRId64 ", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 " entryterm:%" PRId64, pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pEntry->term); @@ -181,22 +183,25 @@ _SEND_RESPONSE: pReply->success = true; // update commit index only after matching SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, TMIN(pMsg->commitIndex, pReply->lastSendIndex)); - sTrace("vgId:%d, update commit return index:%" PRId64, ths->vgId, returnIndex); + sGDebug(&rpcRsp.info.traceId, "vgId:%d, index:%" PRId64 ", last commit index:%" PRId64, ths->vgId, + pMsg->prevLogIndex + 1, returnIndex); } TRACE_SET_MSGID(&(rpcRsp.info.traceId), tGenIdPI64()); { - sGTrace(&rpcRsp.info.traceId, - "vgId:%d, send append reply matchIndex:%" PRId64 " term:%" PRId64 " lastSendIndex:%" PRId64 - " to dest:0x%016" PRIx64, - ths->vgId, pReply->matchIndex, pReply->term, pReply->lastSendIndex, pReply->destId.addr); + sGDebug(&rpcRsp.info.traceId, + "vgId:%d, index:%" PRId64 ", send append entries reply, matchIndex:%" PRId64 " term:%" PRId64 + " lastSendIndex:%" PRId64 " to dest addr:0x%016" PRIx64, + ths->vgId, pMsg->prevLogIndex + 1, pReply->matchIndex, pReply->term, pReply->lastSendIndex, + pReply->destId.addr); } // ack, i.e. send response TAOS_CHECK_RETURN(syncNodeSendMsgById(&pReply->destId, ths, &rpcRsp)); // commit index, i.e. leader notice me - if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { - sError("vgId:%d, failed to commit raft fsm log since %s.", ths->vgId, terrstr()); + if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && + syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, &pRpcMsg->info.traceId, "sync-append-entries") < 0) { + sGError(&pRpcMsg->info.traceId, "vgId:%d, failed to commit raft fsm log since %s", ths->vgId, terrstr()); } if (resetElect) syncNodeResetElectTimer(ths); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 14988b0eee..35b62c52c0 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -63,7 +63,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { return TSDB_CODE_SYN_WRONG_TERM; } - sGDebug(&pRpcMsg->info.traceId, "vgId:%d, received append entries reply, srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64, + sGDebug(&pRpcMsg->info.traceId, "vgId:%d, received append entries reply, src addr:0x%" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64, pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex); if (pMsg->success) { @@ -74,13 +74,13 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { // commit if needed SyncIndex indexLikely = TMIN(pMsg->matchIndex, ths->pLogBuf->matchIndex); - SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely); + SyncIndex commitIndex = syncNodeCheckCommitIndex(ths, indexLikely, &pRpcMsg->info.traceId); if (ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { if (commitIndex >= ths->assignedCommitIndex) { syncNodeStepDown(ths, pMsg->term, pMsg->destId); } } else { - TAOS_CHECK_RETURN(syncLogBufferCommit(ths->pLogBuf, ths, commitIndex)); + TAOS_CHECK_RETURN(syncLogBufferCommit(ths->pLogBuf, ths, commitIndex, &pRpcMsg->info.traceId, "sync-append-entries-reply")); } } @@ -89,7 +89,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { if (pMgr == NULL) { code = TSDB_CODE_MND_RETURN_VALUE_NULL; if (terrno != 0) code = terrno; - sError("vgId:%d, failed to get log repl mgr for src addr: 0x%016" PRIx64, ths->vgId, pMsg->srcId.addr); + sError("vgId:%d, failed to get log repl mgr for src addr:0x%" PRIx64, ths->vgId, pMsg->srcId.addr); TAOS_RETURN(code); } TAOS_CHECK_RETURN(syncLogReplProcessReply(pMgr, ths, pMsg)); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 702a51ce44..433698b764 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -81,13 +81,13 @@ int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commitIndex) { return ths->commitIndex; } -int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { +int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely, const STraceId *trace) { int32_t code = 0; if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) { SyncIndex commitIndex = indexLikely; SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, commitIndex); - sTrace("vgId:%d, agreed upon, role:%d, term:%" PRId64 ", index:%" PRId64 ", return:%" PRId64, ths->vgId, ths->state, - raftStoreGetTerm(ths), commitIndex, returnIndex); + sGDebug(trace, "vgId:%d, index:%" PRId64 ", agreed upon, role:%d term:%" PRId64 " return index:%" PRId64, ths->vgId, + commitIndex, ths->state, raftStoreGetTerm(ths), returnIndex); } return ths->commitIndex; } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index a0d76aade6..d0a79fb266 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -744,7 +744,7 @@ int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) { } SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) { - if (!(pSyncNode->raftCfg.configIndexCount >= 1)) { + if (pSyncNode->raftCfg.configIndexCount < 1) { sError("vgId:%d, failed get snapshot config index, configIndexCount:%d", pSyncNode->vgId, pSyncNode->raftCfg.configIndexCount); terrno = TSDB_CODE_SYN_INTERNAL_ERROR; @@ -758,8 +758,8 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho lastIndex = (pSyncNode->raftCfg.configIndexArr)[i]; } } - sTrace("vgId:%d, sync get last config index, index:%" PRId64 " lcindex:%" PRId64, pSyncNode->vgId, - snapshotLastApplyIndex, lastIndex); + sTrace("vgId:%d, index:%" PRId64 ", get last snapshot config index:%" PRId64, pSyncNode->vgId, snapshotLastApplyIndex, + lastIndex); return lastIndex; } @@ -1010,7 +1010,7 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { pData->logicClock = pSyncTimer->logicClock; pData->execTime = tsNow + pSyncTimer->timerMS; - sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:%" PRId64 " at %d", pSyncNode->vgId, pData->rid, + sTrace("vgId:%d, start hb timer, rid:%" PRId64 " addr:0x%" PRIx64 " at %d", pSyncNode->vgId, pData->rid, pData->destId.addr, pSyncTimer->timerMS); bool stopped = taosTmrResetPriority(pSyncTimer->timerCb, pSyncTimer->timerMS, (void*)(pData->rid), @@ -1474,7 +1474,7 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) { sInfo("vgId:%d, restore began, and keep syncing until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex); if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE && - (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex)) < 0) { + (code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex, NULL, "restore")) < 0) { TAOS_RETURN(code); } @@ -1708,7 +1708,7 @@ int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { int32_t code = 0; (void)atomic_add_fetch_64(&pSyncNode->electTimerLogicClock, 1); bool stop = taosTmrStop(pSyncNode->pElectTimer); - sDebug("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop); + sTrace("vgId:%d, stop elect timer, stop:%d", pSyncNode->vgId, stop); pSyncNode->pElectTimer = NULL; return code; @@ -1821,7 +1821,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg } if (code < 0) { - sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:%" PRId64, pNode->vgId, tstrerror(code), + sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:0x%" PRIx64, pNode->vgId, tstrerror(code), epSet, DID(destRaftId), destRaftId->addr); rpcFreeCont(pMsg->pCont); } @@ -2754,7 +2754,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { return; } - sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:%" PRId64, pSyncNode->vgId, hbDataRid, + sTrace("vgId:%d, peer hb timer execution, rid:%" PRId64 " addr:0x%" PRIx64, pSyncNode->vgId, hbDataRid, pData->destId.addr); if (pSyncNode->totalReplicaNum > 1) { @@ -2790,6 +2790,7 @@ static void syncNodeEqPeerHeartbeatTimer(void* param, void* tmrId) { // send msg TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64()); + TRACE_SET_ROOTID(&(rpcMsg.info.traceId), tGenIdPI64()); sGTrace(&rpcMsg.info.traceId, "vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId))); syncLogSendHeartbeat(pSyncNode, pSyncMsg, false, timerElapsed, pData->execTime); @@ -2949,7 +2950,7 @@ void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) { } for (int32_t i = 0; i < ths->peersNum; ++i) { - sInfo("vgId:%d, %s, peersId%d, addr:%" PRId64, ths->vgId, str, i, ths->peersId[i].addr); + sInfo("vgId:%d, %s, peersId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->peersId[i].addr); } for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) { @@ -2960,7 +2961,7 @@ void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) { } for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) { - sInfo("vgId:%d, %s, replicasId%d, addr:%" PRId64, ths->vgId, str, i, ths->replicasId[i].addr); + sInfo("vgId:%d, %s, replicasId%d, addr:0x%" PRIx64, ths->vgId, str, i, ths->replicasId[i].addr); } } @@ -3198,7 +3199,7 @@ void syncNodeChangeToVoter(SSyncNode* ths) { ths->replicaNum = ths->raftCfg.cfg.replicaNum; sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum); for (int32_t i = 0; i < ths->totalReplicaNum; ++i) { - sDebug("vgId:%d, i:%d, replicaId.addr:%" PRIx64, ths->vgId, i, ths->replicasId[i].addr); + sDebug("vgId:%d, i:%d, replicaId.addr:0x%" PRIx64, ths->vgId, i, ths->replicasId[i].addr); } // pMatchIndex, pNextIndex, only need to change replicaNum when 1->3 @@ -3430,10 +3431,11 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry, SRpcMsg* pMsg) { _out:; // proceed match index, with replicating on needed SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append", pMsg); + const STraceId* trace = pEntry ? &pEntry->originRpcTraceId : NULL; if (pEntry != NULL) { - sGDebug(&pEntry->originRpcTraceId, - "vgId:%d, index:%" PRId64 ", append raft entry, msg:%p term:%" PRId64 " buf:[%" PRId64 " %" PRId64 + sGDebug(trace, + "vgId:%d, index:%" PRId64 ", raft entry appended, msg:%p term:%" PRId64 " buf:[%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId, pEntry->index, pMsg, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex); @@ -3441,14 +3443,12 @@ _out:; if (code == 0 && ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { int64_t index = syncNodeUpdateAssignedCommitIndex(ths, matchIndex); - sGTrace(pEntry ? &pEntry->originRpcTraceId : NULL, "vgId:%d, index:%" PRId64 ", update assigned commit, msg:%p", - ths->vgId, index, pMsg); + sGTrace(trace, "vgId:%d, index:%" PRId64 ", update assigned commit, msg:%p", ths->vgId, index, pMsg); if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && - syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex) < 0) { - sGError(pEntry ? &pEntry->originRpcTraceId : NULL, - "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64, ths->vgId, index, pMsg, - ths->commitIndex); + syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex, trace, "append-entry") < 0) { + sGError(trace, "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64, ths->vgId, index, + pMsg, ths->commitIndex); code = TSDB_CODE_SYN_INTERNAL_ERROR; } } @@ -3460,13 +3460,12 @@ _out:; // single replica SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, matchIndex); - sGTrace(pEntry ? &pEntry->originRpcTraceId : NULL, - "vgId:%d, index:%" PRId64 ", raft entry update commit, msg:%p return index:%" PRId64, ths->vgId, matchIndex, - pMsg, returnIndex); + sGTrace(trace, "vgId:%d, index:%" PRId64 ", raft entry update commit, msg:%p return index:%" PRId64, ths->vgId, + matchIndex, pMsg, returnIndex); if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && - (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex)) < 0) { - sGError(pEntry ? &pEntry->originRpcTraceId : NULL, + (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, trace, "append-entry")) < 0) { + sGError(trace, "vgId:%d, index:%" PRId64 ", failed to commit, msg:%p commit index:%" PRId64 " return index:%" PRId64, ths->vgId, matchIndex, pMsg, ths->commitIndex, returnIndex); } @@ -3592,18 +3591,20 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { pMsgReply->privateTerm = 8864; // magic number pMsgReply->startTime = ths->startTime; pMsgReply->timeStamp = tsMs; + rpcMsg.info.traceId = pRpcMsg->info.traceId; // reply TRACE_SET_MSGID(&(rpcMsg.info.traceId), tGenIdPI64()); - sGTrace(&rpcMsg.info.traceId, "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64, + sGDebug(&rpcMsg.info.traceId, "vgId:%d, send sync-heartbeat-reply to dnode:%d term:%" PRId64 " timestamp:%" PRId64, ths->vgId, DID(&(pMsgReply->destId)), pMsgReply->term, pMsgReply->timeStamp); TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg)); if (currentTerm == 0) currentTerm = raftStoreGetTerm(ths); - sGTrace(&rpcMsg.info.traceId, - "vgId:%d, process sync-heartbeat msg from dnode:%d, cluster:%d, Msgterm:%" PRId64 " currentTerm:%" PRId64, - ths->vgId, DID(&(pMsg->srcId)), CID(&(pMsg->srcId)), pMsg->term, currentTerm); + sGDebug(&rpcMsg.info.traceId, + "vgId:%d, process sync-heartbeat msg from dnode:%d, commit-index:%" PRId64 ", cluster:%d msgTerm:%" PRId64 + " currentTerm:%" PRId64, + ths->vgId, DID(&(pMsg->srcId)), pMsg->commitIndex, CID(&(pMsg->srcId)), pMsg->term, currentTerm); if (pMsg->term > currentTerm && ths->state == TAOS_SYNC_STATE_LEARNER) { raftStoreSetTerm(ths, pMsg->term); @@ -3620,6 +3621,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { if (ths->state == TAOS_SYNC_STATE_FOLLOWER || ths->state == TAOS_SYNC_STATE_LEARNER) { SRpcMsg rpcMsgLocalCmd = {0}; TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId)); + rpcMsgLocalCmd.info.traceId = rpcMsg.info.traceId; SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont; pSyncMsg->cmd = @@ -3633,8 +3635,9 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { sError("vgId:%d, failed to enqueue commit msg from heartbeat since %s, code:%d", ths->vgId, terrstr(), code); rpcFreeCont(rpcMsgLocalCmd.pCont); } else { - sTrace("vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId, - pMsg->commitIndex, pMsg->term); + sGTrace(&rpcMsg.info.traceId, + "vgId:%d, enqueue commit msg from heartbeat, commit-index:%" PRId64 ", term:%" PRId64, ths->vgId, + pMsg->commitIndex, pMsg->term); } } } @@ -3644,6 +3647,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER)) { SRpcMsg rpcMsgLocalCmd = {0}; TAOS_CHECK_RETURN(syncBuildLocalCmd(&rpcMsgLocalCmd, ths->vgId)); + rpcMsgLocalCmd.info.traceId = rpcMsg.info.traceId; SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont; pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN; @@ -3718,9 +3722,10 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } if (pMsg->currentTerm == matchTerm) { SyncIndex returnIndex = syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); - sTrace("vgId:%d, raft entry update commit return index:%" PRId64, ths->vgId, returnIndex); + sTrace("vgId:%d, raft entry update commit, return index:%" PRId64, ths->vgId, returnIndex); } - if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { + if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && + syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex, &pRpcMsg->info.traceId, "heartbeat") < 0) { sError("vgId:%d, failed to commit raft log since %s. commit index:%" PRId64, ths->vgId, terrstr(), ths->commitIndex); } diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index ead69f00ac..7235babfbb 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -526,7 +526,7 @@ int32_t syncLogStorePersist(SSyncLogStore* pLogStore, SSyncNode* pNode, SSyncRaf bool doFsync = syncLogStoreNeedFlush(pEntry, pNode->replicaNum); if ((code = pLogStore->syncLogAppendEntry(pLogStore, pEntry, doFsync)) < 0) { - sError("failed to append sync log entry since %s, index:%" PRId64 ", term:%" PRId64, tstrerror(code), + sError("failed to persist raft entry since %s, index:%" PRId64 ", term:%" PRId64, tstrerror(code), pEntry->index, pEntry->term); TAOS_RETURN(code); } @@ -725,29 +725,37 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe cbMeta.term = pEntry->term; cbMeta.currentTerm = term; cbMeta.flag = -1; + rpcMsg.info.traceId = pEntry->originRpcTraceId; int32_t num = syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info); - sDebug("vgId:%d, get response info, seqNum:%" PRId64 ", num:%d", pNode->vgId, cbMeta.seqNum, num); + sGDebug(&rpcMsg.info.traceId, "vgId:%d, index:%" PRId64 ", get response info, handle:%p seq:%" PRId64 " num:%d", + pNode->vgId, pEntry->index, &rpcMsg.info, cbMeta.seqNum, num); + code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta); retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE); - sDebug("vgId:%d, fsm execute, index:%" PRId64 ", term:%" PRId64 ", type:%s, code:%d, retry:%d", pNode->vgId, - pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), code, retry); + + sGTrace(&rpcMsg.info.traceId, + "vgId:%d, index:%" PRId64 ", fsm execute, term:%" PRId64 ", type:%s, code:%d, retry:%d", pNode->vgId, + pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), code, retry); + if (retry) { taosMsleep(10); if (code == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE) { - sError("vgId:%d, will retry to execute fsm after 10ms, last error is %s, index:%" PRId64, pNode->vgId, - tstrerror(code), pEntry->index); + sGError(&rpcMsg.info.traceId, + "vgId:%d, index:%" PRId64 ", will retry to execute fsm after 10ms, last error is %s", pNode->vgId, + pEntry->index, tstrerror(code)); } else { - sDebug("vgId:%d, will retry to execute fsm after 10ms, last error is %s, index:%" PRId64, pNode->vgId, - tstrerror(code), pEntry->index); + sGTrace(&rpcMsg.info.traceId, + "vgId:%d, index:%" PRId64 ", will retry to execute fsm after 10ms, last error is %s", pNode->vgId, + pEntry->index, tstrerror(code)); } } } while (retry); _exit: if (code < 0) { - sError("vgId:%d, failed to execute fsm at line %d since %s, index:%" PRId64 ", term:%" PRId64 ", type:%s", - pNode->vgId, lino, tstrerror(code), pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType)); + sError("vgId:%d, index:%" PRId64 ", failed to execute fsm at line %d since %s, term:%" PRId64 ", type:%s", + pNode->vgId, pEntry->index, lino, tstrerror(code), pEntry->term, TMSG_INFO(pEntry->originalRpcType)); } return code; } @@ -780,7 +788,8 @@ int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) { return 0; } -int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex) { +int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex, const STraceId* trace, + const char* src) { TAOS_CHECK_RETURN(syncLogBufferValidate(pBuf)); (void)taosThreadMutexLock(&pBuf->mutex); @@ -798,13 +807,14 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm bool restoreFinishAtThisCommit = false; if (commitIndex <= pBuf->commitIndex) { - sDebug("vgId:%d, stale commit index, current:%" PRId64 ", notified:%" PRId64, vgId, pBuf->commitIndex, - commitIndex); + sGDebug(trace, "vgId:%d, stale commit index:%" PRId64 ", notified:%" PRId64, vgId, commitIndex, pBuf->commitIndex); goto _out; } - sTrace("vgId:%d, log commit, buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), role:%d, term:%" PRId64, - pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, currentTerm); + sGDebug(trace, + "vgId:%d, log commit since %s, buffer:[%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 + "), role:%d, term:%" PRId64, + pNode->vgId, src, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex, role, currentTerm); // execute in fsm for (int64_t index = pBuf->commitIndex + 1; index <= upperIndex; index++) { @@ -816,14 +826,16 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm // execute it if (!syncUtilUserCommit(pEntry->originalRpcType)) { - sInfo("vgId:%d, log commit sync barrier, index:%" PRId64 ", term:%" PRId64 ", type:%s", vgId, pEntry->index, - pEntry->term, TMSG_INFO(pEntry->originalRpcType)); + sGInfo(&pEntry->originRpcTraceId, + "vgId:%d, index:%" PRId64 ", log commit sync barrier, term:%" PRId64 ", type:%s", vgId, pEntry->index, + pEntry->term, TMSG_INFO(pEntry->originalRpcType)); } if ((code = syncFsmExecute(pNode, pFsm, role, currentTerm, pEntry, 0, false)) != 0) { - sError("vgId:%d, failed to execute sync log entry, index:%" PRId64 ", term:%" PRId64 - ", role:%d, current term:%" PRId64, - vgId, pEntry->index, pEntry->term, role, currentTerm); + sGError(&pEntry->originRpcTraceId, + "vgId:%d, index:%" PRId64 ", failed to execute sync log entry, term:%" PRId64 + ", role:%d, current term:%" PRId64, + vgId, pEntry->index, pEntry->term, role, currentTerm); goto _out; } pBuf->commitIndex = index; @@ -967,7 +979,7 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; if (pMgr->retryBackoff == SYNC_MAX_RETRY_BACKOFF) { syncLogReplReset(pMgr); - sWarn("vgId:%d, reset sync log repl since retry backoff exceeding limit, peer:%" PRIx64, pNode->vgId, + sWarn("vgId:%d, reset sync log repl since retry backoff exceeding limit, peer addr:0x%" PRIx64, pNode->vgId, pDestId->addr); return TSDB_CODE_OUT_OF_RANGE; } @@ -995,7 +1007,7 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { if (pMgr->states[pos].acked) { if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) { syncLogReplReset(pMgr); - sWarn("vgId:%d, reset sync log repl since stagnation, index:%" PRId64 ", peer:%" PRIx64, pNode->vgId, index, + sWarn("vgId:%d, reset sync log repl since stagnation, index:%" PRId64 ", peer addr:0x%" PRIx64, pNode->vgId, index, pDestId->addr); code = TSDB_CODE_ACTION_IN_PROGRESS; goto _out; @@ -1005,7 +1017,7 @@ int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { bool barrier = false; if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) { - sError("vgId:%d, failed to replicate sync log entry since %s, index:%" PRId64 ", dest:%" PRIx64, pNode->vgId, + sError("vgId:%d, failed to replicate sync log entry since %s, index:%" PRId64 ", dest addr:0x%" PRIx64, pNode->vgId, tstrerror(code), index, pDestId->addr); goto _out; } @@ -1029,7 +1041,7 @@ _out: if (retried) { pMgr->retryBackoff = syncLogReplGetNextRetryBackoff(pMgr); SSyncLogBuffer* pBuf = pNode->pLogBuf; - sInfo("vgId:%d, resend %d sync log entries, dest:%" PRIx64 ", indexes:%" PRId64 " ..., terms: .., %" PRId64 + sInfo("vgId:%d, resend %d sync log entries, dest addr:0x%" PRIx64 ", indexes:%" PRId64 " ..., terms: .., %" PRId64 ", retryWaitMs:%" PRId64 ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex, @@ -1044,7 +1056,7 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn int32_t code = 0; if (pMgr->restored != false) return TSDB_CODE_SYN_INTERNAL_ERROR; - sTrace("vgId:%d, begin to recover sync log repl, peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 ", %" PRId64 + sTrace("vgId:%d, begin to recover sync log repl, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 ", %" PRId64 ", %" PRId64 ") restore:%d, buffer: [%" PRId64 ", %" PRId64 ", %" PRId64 ", %" PRId64 "), msg: {lastSendIndex:%" PRId64 ", matchIndex:%" PRId64 ", fsmState:%d, success:%d, lastMatchTerm:%" PRId64 "}", @@ -1057,7 +1069,7 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn if (pMgr->matchIndex != 0) return TSDB_CODE_SYN_INTERNAL_ERROR; if (pMsg->matchIndex < 0) { pMgr->restored = true; - sInfo("vgId:%d, sync log repl restored, peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 + sInfo("vgId:%d, sync log repl restored, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); @@ -1074,7 +1086,7 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) { pMgr->matchIndex = pMsg->matchIndex; pMgr->restored = true; - sInfo("vgId:%d, sync log repl restored, peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 + sInfo("vgId:%d, sync log repl restored, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); @@ -1110,7 +1122,7 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn if (pMsg->matchIndex == -1) { // first time to restore - sInfo("vgId:%d, first time to restore sync log repl, peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 + sInfo("vgId:%d, first time to restore sync log repl, peer dnode:%d (0x%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), index:%" PRId64 ", firstVer:%" PRId64 ", term:%" PRId64 ", lastMatchTerm:%" PRId64, pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, @@ -1148,7 +1160,7 @@ int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode SSyncLogBuffer* pBuf = pNode->pLogBuf; (void)taosThreadMutexLock(&pMgr->mutex); if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) { - sInfo("vgId:%d, reset sync log repl in heartbeat, peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64, + sInfo("vgId:%d, reset sync log repl in heartbeat, peer addr:0x%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64, pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime); syncLogReplReset(pMgr); pMgr->peerStartTime = pMsg->startTime; @@ -1161,7 +1173,7 @@ int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncApp SSyncLogBuffer* pBuf = pNode->pLogBuf; (void)taosThreadMutexLock(&pMgr->mutex); if (pMsg->startTime != pMgr->peerStartTime) { - sInfo("vgId:%d, reset sync log repl in appendlog reply, peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64, + sInfo("vgId:%d, reset sync log repl in appendlog reply, peer addr:0x%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64, pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime); syncLogReplReset(pMgr); pMgr->peerStartTime = pMsg->startTime; @@ -1200,7 +1212,7 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde int64_t nowMs = taosGetMonoTimestampMs(); int32_t code = 0; - sTrace("vgId:%d, begin to probe peer:%" PRIx64 " with msg of index:%" PRId64 ", repl-mgr:[%" PRId64 ", %" PRId64 + sTrace("vgId:%d, begin to probe peer addr:0x%" PRIx64 " with msg of index:%" PRId64 ", repl-mgr:[%" PRId64 ", %" PRId64 ", %" PRId64 "), restored:%d", pNode->vgId, pNode->replicasId[pMgr->peerId].addr, index, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pMgr->restored); @@ -1215,7 +1227,7 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde bool barrier = false; SyncTerm term = -1; if ((code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier)) < 0) { - sError("vgId:%d, failed to replicate log entry since %s, index:%" PRId64 ", dest:0x%016" PRIx64, pNode->vgId, + sError("vgId:%d, failed to replicate log entry since %s, index:%" PRId64 ", dest addr:0x%016" PRIx64, pNode->vgId, tstrerror(code), index, pDestId->addr); TAOS_RETURN(code); } @@ -1230,7 +1242,7 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde pMgr->endIndex = index + 1; SSyncLogBuffer* pBuf = pNode->pLogBuf; - sTrace("vgId:%d, probe peer:%" PRIx64 " with msg of index:%" PRId64 " term:%" PRId64 ", repl-mgr:[%" PRId64 + sTrace("vgId:%d, probe peer addr:0x%" PRIx64 " with msg of index:%" PRId64 " term:%" PRId64 ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pDestId->addr, index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); @@ -1240,7 +1252,7 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { if (!pMgr->restored) return TSDB_CODE_SYN_INTERNAL_ERROR; - sTrace("vgId:%d, replicate log entries from end to match, repl-mgr:[%" PRId64 ", %" PRId64 ", %" PRId64 + sTrace("vgId:%d, replicate raft entries from end to match, repl-mgr:[%" PRId64 ", %" PRId64 ", %" PRId64 "), restore:%d", pNode->vgId, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pMgr->restored); @@ -1267,7 +1279,7 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { code = syncLogReplSendTo(pMgr, pNode, index, &term, pDestId, &barrier); if (code < 0) { - sError("vgId:%d, failed to replicate log entry since %s, index:%" PRId64 ", dest:0x%016" PRIx64, pNode->vgId, + sError("vgId:%d, failed to replicate log entry since %s, index:%" PRId64 ", dest addr:0x%016" PRIx64, pNode->vgId, tstrerror(code), index, pDestId->addr); TAOS_RETURN(code); } @@ -1294,7 +1306,7 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { TAOS_CHECK_RETURN(syncLogReplRetryOnNeed(pMgr, pNode)); SSyncLogBuffer* pBuf = pNode->pLogBuf; - sTrace("vgId:%d, replicated %d msgs to peer:%" PRIx64 ", indexes:%" PRId64 "..., terms: ...%" PRId64 + sTrace("vgId:%d, replicated %d msgs to peer addr:0x%" PRIx64 ", indexes:%" PRId64 "..., terms: ...%" PRId64 ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, @@ -1555,7 +1567,7 @@ int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex ind if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) { SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(pNode, pDestId); if (pMgr) { - sInfo("vgId:%d, reset sync log repl of peer:%" PRIx64 " since %s, index:%" PRId64, pNode->vgId, pDestId->addr, + sInfo("vgId:%d, reset sync log repl of peer addr:0x%" PRIx64 " since %s, index:%" PRId64, pNode->vgId, pDestId->addr, tstrerror(code), index); syncLogReplReset(pMgr); } @@ -1579,8 +1591,8 @@ int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex ind TRACE_SET_MSGID(&(msgOut.info.traceId), tGenIdPI64()); sGDebug(&msgOut.info.traceId, - "vgId:%d, replicate one msg index:%" PRId64 " term:%" PRId64 " prevterm:%" PRId64 " to dest:0x%016" PRIx64, - pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr); + "vgId:%d, index:%" PRId64 ", replicate one msg to dest addr:0x%" PRIx64 ", term:%" PRId64 " prevterm:%" PRId64, + pNode->vgId, pEntry->index, pDestId->addr, pEntry->term, prevLogTerm); TAOS_CHECK_GOTO(syncNodeSendAppendEntries(pNode, pDestId, &msgOut), &lino, _err); if (!inBuf) { diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 92c98ab821..e2f2c23feb 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -249,8 +249,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr TAOS_RETURN(code); } - sNTrace(pData->pSyncNode, "index:%" PRId64 ", append raft log, type:%s origin type:%s elapsed:%" PRId64, pEntry->index, - TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed); + sGDebug(&pEntry->originRpcTraceId, + "vgId:%d, index:%" PRId64 ", persist raft entry, type:%s origin type:%s elapsed:%" PRId64, + pData->pSyncNode->vgId, pEntry->index, TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), + tsElapsed); TAOS_RETURN(TSDB_CODE_SUCCESS); } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 1dd7f723a4..5fec8a3ec8 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -69,7 +69,7 @@ bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId->addr = SYNC_ADDR(pInfo); raftId->vgId = vgId; - sInfo("vgId:%d, sync addr:%" PRIu64 " is resolved, ep:%s:%u ip:%s ipv4:%u dnode:%d cluster:%" PRId64, vgId, + sInfo("vgId:%d, sync addr:0x%" PRIx64 " is resolved, ep:%s:%u ip:%s ipv4:%u dnode:%d cluster:%" PRId64, vgId, raftId->addr, pInfo->nodeFqdn, pInfo->nodePort, ipbuf, ipv4, pInfo->nodeId, pInfo->clusterId); return true; }