From 51861bb82ba07047939dff773318914b826448cf Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 14 Nov 2022 14:20:13 +0800 Subject: [PATCH 1/2] refactor(sync): delete some code --- source/libs/sync/src/syncAppendEntriesReply.c | 37 ------------------- 1 file changed, 37 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index ea4dee64f1..2e22ac98a5 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -39,43 +39,6 @@ // /\ UNCHANGED <> // -// only start once -static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm, - SyncAppendEntriesReply* pMsg) { - if (beginIndex > endIndex) { - sNError(ths, "snapshot param error, start:%" PRId64 ", end:%" PRId64, beginIndex, endIndex); - return; - } - - // get sender - SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); - ASSERT(pSender != NULL); - - if (snapshotSenderIsStart(pSender)) { - sSError(pSender, "snapshot sender already start"); - return; - } - - SSnapshot snapshot = { - .data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID}; - void* pReader = NULL; - SSnapshotParam readerParam = {.start = beginIndex, .end = endIndex}; - int32_t code = ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader); - ASSERT(code == 0); - -#if 0 - if (pMsg->privateTerm < pSender->privateTerm) { - ASSERT(pReader != NULL); - snapshotSenderStart(pSender, readerParam, snapshot, pReader); - - } else { - if (pReader != NULL) { - ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader); - } - } -#endif -} - int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { int32_t ret = 0; From 9b58176c5872175082470264a27d9e1b7c2cff35 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 14 Nov 2022 15:56:34 +0800 Subject: [PATCH 2/2] fix(sync): fix AddressSanitizer error: TD-20372 --- source/libs/sync/src/syncMain.c | 5 ++++- source/libs/sync/src/syncRaftLog.c | 13 +++++++++++-- source/libs/sync/src/syncReplication.c | 4 +++- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7ed90fb140..8bccf6840a 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -385,7 +385,7 @@ bool syncIsReadyForRead(int64_t rid) { if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) { SSyncRaftEntry* pEntry = NULL; int32_t code = pSyncNode->pLogStore->syncLogGetEntry( - pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); + pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); if (code == 0 && pEntry != NULL) { if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) { ready = true; @@ -1832,6 +1832,9 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { SElectTimer* pElectTimer = param; SSyncNode* pNode = pElectTimer->pSyncNode; + if (pNode == NULL) return; + if (pNode->syncEqMsg == NULL) return; + SRpcMsg rpcMsg = {0}; int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 7e4b18ab88..bc07781e5c 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -197,7 +197,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr syncMeta.isWeek = pEntry->isWeak; syncMeta.seqNum = pEntry->seqNum; syncMeta.term = pEntry->term; + + int64_t tsWriteBegin = taosGetTimestampMs(); index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); + int64_t tsWriteEnd = taosGetTimestampMs(); + int64_t tsElapsed = tsWriteEnd - tsWriteBegin; + if (index < 0) { int32_t err = terrno; const char* errStr = tstrerror(err); @@ -210,8 +215,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr } pEntry->index = index; - sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s", pEntry->index, - TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType)); + sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index, + TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed); return 0; } @@ -236,7 +241,11 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR taosThreadMutexLock(&(pData->mutex)); + int64_t tsBegin = taosGetTimestampMs(); code = walReadVer(pWalHandle, index); + int64_t tsEnd = taosGetTimestampMs(); + int64_t tsElapsed = tsEnd - tsBegin; + // code = walReadVerCached(pWalHandle, index); if (code != 0) { int32_t err = terrno; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 25d6474f67..59afe814eb 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -153,14 +153,16 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI // save index, otherwise pMsg will be free by rpc SyncIndex saveLastSendIndex = pState->lastSendIndex; + bool update = false; if (pMsg->dataLen > 0) { saveLastSendIndex = pMsg->prevLogIndex + 1; + update = true; } syncLogSendAppendEntries(pSyncNode, pMsg, ""); syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg); - if (pMsg->dataLen > 0) { + if (update) { pState->lastSendIndex = saveLastSendIndex; pState->lastSendTime = taosGetTimestampMs(); }