From 8f55e007f4f3a8f98309b7fa9df9f28666f015ca Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 14 Nov 2022 16:23:52 +0800 Subject: [PATCH 1/4] refactor(sync): add trace log --- source/libs/sync/src/syncRaftLog.c | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index bc07781e5c..21f301308e 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -239,12 +239,12 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR return -1; } + int64_t ts1 = taosGetTimestampMs(); taosThreadMutexLock(&(pData->mutex)); - int64_t tsBegin = taosGetTimestampMs(); + int64_t ts2 = taosGetTimestampMs(); code = walReadVer(pWalHandle, index); - int64_t tsEnd = taosGetTimestampMs(); - int64_t tsElapsed = tsEnd - tsBegin; + int64_t ts3 = taosGetTimestampMs(); // code = walReadVerCached(pWalHandle, index); if (code != 0) { @@ -289,6 +289,18 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR */ taosThreadMutexUnlock(&(pData->mutex)); + int64_t ts4 = taosGetTimestampMs(); + + int64_t tsElapsed = ts4 - ts1; + int64_t tsElapsedLock = ts2 - ts1; + int64_t tsElapsedRead = ts3 - ts2; + int64_t tsElapsedBuild = ts4 - ts3; + + sNTrace(pData->pSyncNode, + "read index:%" PRId64 ", elapsed:%" PRId64 ", elapsed-lock:%" PRId64 ", elapsed-read:%" PRId64 + ", elapsed-build:%" PRId64, + index, tsElapsed, tsElapsedLock, tsElapsedRead, tsElapsedBuild); + return code; } From ac8eeb87afdeb1ec1e676f4887500b1eab84f50a Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 14 Nov 2022 17:19:20 +0800 Subject: [PATCH 2/4] refactor(sync): modify error log --- source/libs/sync/src/syncMain.c | 8 ++++---- source/libs/sync/src/syncRaftLog.c | 12 ++++++------ source/libs/sync/src/syncUtil.c | 15 ++++++++------- 3 files changed, 18 insertions(+), 17 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 8bccf6840a..b94d807bcc 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1806,7 +1806,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock), pNode->pingTimerMS, pNode); if (code != 0) { - sNError(pNode, "failed to build ping msg"); + sError(pNode, "failed to build ping msg"); rpcFreeCont(rpcMsg.pCont); return; } @@ -1814,7 +1814,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { sNTrace(pNode, "enqueue ping msg"); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { - sNError(pNode, "failed to sync enqueue ping msg since %s", terrstr()); + sError(pNode, "failed to sync enqueue ping msg since %s", terrstr()); rpcFreeCont(rpcMsg.pCont); return; } @@ -1839,7 +1839,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode); if (code != 0) { - sNError(pNode, "failed to build elect msg"); + sError(pNode, "failed to build elect msg"); taosMemoryFree(pElectTimer); return; } @@ -1849,7 +1849,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { - sNError(pNode, "failed to sync enqueue elect msg since %s", terrstr()); + sError(pNode, "failed to sync enqueue elect msg since %s", terrstr()); rpcFreeCont(rpcMsg.pCont); taosMemoryFree(pElectTimer); return; diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 21f301308e..b00ba3918c 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -198,9 +198,9 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr syncMeta.seqNum = pEntry->seqNum; syncMeta.term = pEntry->term; - int64_t tsWriteBegin = taosGetTimestampMs(); + int64_t tsWriteBegin = taosGetTimestampNs(); index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); - int64_t tsWriteEnd = taosGetTimestampMs(); + int64_t tsWriteEnd = taosGetTimestampNs(); int64_t tsElapsed = tsWriteEnd - tsWriteBegin; if (index < 0) { @@ -239,12 +239,12 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR return -1; } - int64_t ts1 = taosGetTimestampMs(); + int64_t ts1 = taosGetTimestampNs(); taosThreadMutexLock(&(pData->mutex)); - int64_t ts2 = taosGetTimestampMs(); + int64_t ts2 = taosGetTimestampNs(); code = walReadVer(pWalHandle, index); - int64_t ts3 = taosGetTimestampMs(); + int64_t ts3 = taosGetTimestampNs(); // code = walReadVerCached(pWalHandle, index); if (code != 0) { @@ -289,7 +289,7 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR */ taosThreadMutexUnlock(&(pData->mutex)); - int64_t ts4 = taosGetTimestampMs(); + int64_t ts4 = taosGetTimestampNs(); int64_t tsElapsed = ts4 - ts1; int64_t tsElapsedLock = ts2 - ts1; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index a575de7e56..b9a271ab9d 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -194,6 +194,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) { if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; + int64_t currentTerm = pNode->pRaftStore->currentTerm; // save error code, otherwise it will be overwritten int32_t errCode = terrno; @@ -235,8 +236,8 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-tm:%" PRIu64 ", sby:%d, aq:%d, bch:%d, r-num:%d, lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", - pNode->vgId, syncStr(pNode->state), eventLog, pNode->pRaftStore->currentTerm, pNode->commitIndex, - logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, + pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex, + logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->pRaftCfg->isStandBy, aqItems, pNode->pRaftCfg->batchSize, pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); @@ -374,9 +375,9 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, - "recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64 - "}, %s", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); + "recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 + ", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s); } void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { @@ -511,8 +512,8 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, "send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 - ", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, + ", lsend-index:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, (pMsg->prevLogIndex + 1), pMsg->commitIndex, pMsg->dataLen, s); } From 76628a52e6cfe45b6229f0ef8252f1d76b45edbe Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 14 Nov 2022 17:33:48 +0800 Subject: [PATCH 3/4] fix(sync): do not use SNError --- source/libs/sync/src/syncMain.c | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b94d807bcc..e802f60f30 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1806,7 +1806,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock), pNode->pingTimerMS, pNode); if (code != 0) { - sError(pNode, "failed to build ping msg"); + sError("failed to build ping msg"); rpcFreeCont(rpcMsg.pCont); return; } @@ -1814,7 +1814,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { sNTrace(pNode, "enqueue ping msg"); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { - sError(pNode, "failed to sync enqueue ping msg since %s", terrstr()); + sError("failed to sync enqueue ping msg since %s", terrstr()); rpcFreeCont(rpcMsg.pCont); return; } @@ -1839,7 +1839,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode); if (code != 0) { - sError(pNode, "failed to build elect msg"); + sError("failed to build elect msg"); taosMemoryFree(pElectTimer); return; } @@ -1849,7 +1849,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { - sError(pNode, "failed to sync enqueue elect msg since %s", terrstr()); + sError("failed to sync enqueue elect msg since %s", terrstr()); rpcFreeCont(rpcMsg.pCont); taosMemoryFree(pElectTimer); return; @@ -1879,14 +1879,14 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { pNode->heartbeatTimerMS, pNode); if (code != 0) { - sNError(pNode, "failed to build heartbeat msg"); + sError("failed to build heartbeat msg"); return; } sNTrace(pNode, "enqueue heartbeat timer"); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { - sNError(pNode, "failed to enqueue heartbeat msg since %s", terrstr()); + sError("failed to enqueue heartbeat msg since %s", terrstr()); rpcFreeCont(rpcMsg.pCont); return; } @@ -1971,7 +1971,7 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) { sNTrace(pNode, "propose msg, type:noop"); code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg); if (code != 0) { - sNError(pNode, "failed to propose noop msg while enqueue since %s", terrstr()); + sError("failed to propose noop msg while enqueue since %s", terrstr()); } return code; @@ -2005,7 +2005,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { if (ths->state == TAOS_SYNC_STATE_LEADER) { int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); if (code != 0) { - sNError(ths, "append noop error"); + sError("append noop error"); return -1; } } @@ -2109,7 +2109,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncNodeFollowerCommit(ths, pMsg->fcIndex); } else { - sNError(ths, "error local cmd"); + sError("error local cmd"); } return 0; From d381b4da9331b6094807d8d39b9dbaacf0b363d0 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 14 Nov 2022 18:17:14 +0800 Subject: [PATCH 4/4] fix(sync): fix memory leak --- source/libs/sync/src/syncReplication.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 59afe814eb..6a7a2c18c1 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -72,7 +72,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh SRpcMsg rpcMsg = {0}; SyncAppendEntries* pMsg = NULL; - SSyncRaftEntry* pEntry; + SSyncRaftEntry* pEntry = NULL; int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry); if (code == 0) { @@ -99,6 +99,8 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh } } + syncEntryDestory(pEntry); + // prepare msg ASSERT(pMsg != NULL); pMsg->srcId = pSyncNode->myRaftId;