diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 58b32ed025..ae1c4d9e50 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -166,21 +166,21 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf); int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry); int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm); -int64_t syncLogBufferLoad(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex); int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode); int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t commitIndex); -SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf); int64_t syncNodeUpdateCommitIndex(SSyncNode* ths, SyncIndex commtIndex); SyncAppendEntries* syncLogToAppendEntries(SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm); // private +SSyncRaftEntry* syncLogBufferGetOneEntry(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex index, bool* pInBuf); int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf); int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SyncIndex toIndex); int32_t syncLogBufferReplicate(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevLogTerm); -void syncIndexMgrSetIndex(SSyncIndexMgr* pSyncIndexMgr, const SRaftId* pRaftId, SyncIndex index); bool syncNodeAgreedUpon(SSyncNode* pNode, SyncIndex index); +void syncIndexMgrSetIndex(SSyncIndexMgr* pSyncIndexMgr, const SRaftId* pRaftId, SyncIndex index); + typedef struct SSyncNode { // init by SSyncInfo SyncGroupId vgId; diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 285981012e..1111547a9a 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -324,11 +324,11 @@ int32_t syncLogBufferInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr()); goto _err; } - - SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); SyncIndex commitIndex = snapshot.lastApplyIndex; SyncTerm commitTerm = snapshot.lastApplyTerm; - SyncIndex toIndex = TMAX(lastVer, commitIndex); + + SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); + SyncIndex toIndex = lastVer; ASSERT(lastVer >= commitIndex); // update match index @@ -406,93 +406,6 @@ _err: return -1; } -int64_t syncLogBufferLoadOld(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) { - taosThreadMutexLock(&pBuf->mutex); - syncLogBufferValidate(pBuf); - - SSyncLogStore* pLogStore = pNode->pLogStore; - ASSERT(pBuf->startIndex <= pBuf->matchIndex); - ASSERT(pBuf->matchIndex + 1 == pBuf->endIndex); - SyncIndex index = pBuf->endIndex; - SSyncRaftEntry* pMatch = pBuf->entries[(index - 1 + pBuf->size) % pBuf->size].pItem; - ASSERT(pMatch != NULL); - - while (index - pBuf->startIndex < pBuf->size && index <= toIndex) { - SSyncRaftEntry* pEntry = NULL; - if (pLogStore->syncLogGetEntry(pLogStore, index, &pEntry) < 0) { - sError("vgId:%d, failed to get log entry since %s. index:%" PRId64 "", pNode->vgId, terrstr(), index); - ASSERT(0); - break; - } - ASSERT(pMatch->index + 1 == pEntry->index); - SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = pMatch->index, .prevLogTerm = pMatch->term}; - pBuf->entries[pBuf->endIndex % pBuf->size] = tmp; - - sInfo("vgId:%d, loaded log entry into log buffer. index: %" PRId64 ", term: %" PRId64, pNode->vgId, pEntry->index, - pEntry->term); - - pBuf->matchIndex = index; - pBuf->endIndex = index + 1; - pMatch = pEntry; - index++; - } - - syncLogBufferValidate(pBuf); - taosThreadMutexUnlock(&pBuf->mutex); - return index; -} - -int32_t syncLogBufferInitOld(SSyncLogBuffer* pBuf, SSyncNode* pNode) { - taosThreadMutexLock(&pBuf->mutex); - ASSERT(pNode->pLogStore != NULL && "log store not created"); - ASSERT(pNode->pFsm != NULL && "pFsm not registered"); - ASSERT(pNode->pFsm->FpGetSnapshotInfo != NULL && "FpGetSnapshotInfo not registered"); - - SSnapshot snapshot; - if (pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot) < 0) { - sError("vgId:%d, failed to get snapshot info since %s", pNode->vgId, terrstr()); - goto _err; - } - SyncIndex commitIndex = snapshot.lastApplyIndex; - SyncTerm commitTerm = snapshot.lastApplyTerm; - - // init log buffer indexes - pBuf->startIndex = commitIndex; - pBuf->matchIndex = commitIndex; - pBuf->commitIndex = commitIndex; - pBuf->endIndex = commitIndex + 1; - - // put a dummy record at initial commitIndex - SSyncRaftEntry* pDummy = syncEntryBuildDummy(commitTerm, commitIndex, pNode->vgId); - if (pDummy == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - SSyncLogBufEntry tmp = {.pItem = pDummy, .prevLogIndex = commitIndex - 1, .prevLogTerm = commitTerm}; - pBuf->entries[(commitIndex + pBuf->size) % pBuf->size] = tmp; - - taosThreadMutexUnlock(&pBuf->mutex); - return 0; - -_err: - taosThreadMutexUnlock(&pBuf->mutex); - return -1; -} - -int32_t syncLogBufferRollbackMatchIndex(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) { - if (toIndex <= pBuf->commitIndex) { - sError("vgId:%d, cannot rollback across commit index:%" PRId64 ", to index:%" PRId64 "", pNode->vgId, - pBuf->commitIndex, toIndex); - return -1; - } - - pBuf->matchIndex = TMIN(pBuf->matchIndex, toIndex - 1); - - // update my match index - syncIndexMgrSetIndex(pNode->pMatchIndex, &pNode->myRaftId, pBuf->matchIndex); - return 0; -} - FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) { SyncIndex index = pBuf->matchIndex; SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; @@ -509,26 +422,25 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf); if (index <= pBuf->commitIndex) { - sInfo("vgId:%d, raft entry already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 - " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, - pBuf->endIndex); + sDebug("vgId:%d, raft entry already committed. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 + " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, + pBuf->endIndex); ret = 0; goto _out; } if (index - pBuf->startIndex >= pBuf->size) { - sInfo("vgId:%d, raft entry out of buffer capacity. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 - " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, - pBuf->endIndex); + sDebug("vgId:%d, raft entry out of buffer capacity. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 + " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, + pBuf->endIndex); goto _out; } if (index > pBuf->matchIndex && lastMatchTerm != prevTerm) { - sInfo("vgId:%d, not ready to accept raft entry (i.e. across barrier). index: %" PRId64 ", term: %" PRId64 - ": prevterm: %" PRId64 " /= lastmatch: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 - ", %" PRId64 ")", + sInfo("vgId:%d, not ready to accept raft entry. index: %" PRId64 ", term: %" PRId64 ": prevterm: %" PRId64 + " != lastmatch: %" PRId64 ". log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pEntry->index, pEntry->term, prevTerm, lastMatchTerm, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); goto _out; @@ -542,10 +454,10 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt if (pEntry->term != pExist->term) { (void)syncLogBufferRollback(pBuf, index); } else { - sInfo("vgId:%d, duplicate raft entry received. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 - " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, - pBuf->endIndex); + sDebug("vgId:%d, duplicate raft entry received. index: %" PRId64 ", term: %" PRId64 ". log buffer: [%" PRId64 + " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, + pBuf->endIndex); SyncTerm existPrevTerm = pBuf->entries[index % pBuf->size].prevLogTerm; ASSERT(pEntry->term == pExist->term && prevTerm == existPrevTerm); ret = 0; @@ -647,8 +559,8 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode) { // increase match index pBuf->matchIndex = index; - sInfo("vgId:%d, log buffer proceed. start index: %" PRId64 ", match index: %" PRId64 ", end index: %" PRId64, - pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex); + sDebug("vgId:%d, log buffer proceed. start index: %" PRId64 ", match index: %" PRId64 ", end index: %" PRId64, + pNode->vgId, pBuf->startIndex, pBuf->matchIndex, pBuf->endIndex); // replicate on demand (void)syncNodeReplicate(pNode); @@ -759,8 +671,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm } pBuf->commitIndex = index; - sInfo("vgId:%d, committed index: %" PRId64 ", term: %" PRId64 ", role: %d, current term: %" PRId64 "", pNode->vgId, - pEntry->index, pEntry->term, role, term); + sDebug("vgId:%d, committed index: %" PRId64 ", term: %" PRId64 ", role: %d, current term: %" PRId64 "", pNode->vgId, + pEntry->index, pEntry->term, role, term); if (!inBuf) { syncEntryDestroy(pEntry); @@ -784,8 +696,8 @@ _out: if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex) { pNode->pFsm->FpRestoreFinishCb(pNode->pFsm); pNode->restoreFinish = true; - sInfo("vgId:%d, restore finished. commit index:%" PRId64 ", match index:%" PRId64 ", last index:%" PRId64 "", - pNode->vgId, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex - 1); + sInfo("vgId:%d, restore finished. pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, + pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); } if (!inBuf) { @@ -799,6 +711,7 @@ _out: int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { SyncAppendEntriesReply* pReply = NULL; + bool accepted = false; // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { syncLogRecvAppendEntries(ths, pMsg, "not in my config"); @@ -847,20 +760,21 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { goto _IGNORE; } - sInfo("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64 - ", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 "", - pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex); + sDebug("vgId:%d, recv append entries msg. index:%" PRId64 ", term:%" PRId64 ", preLogIndex:%" PRId64 + ", prevLogTerm:%" PRId64 " commitIndex:%" PRId64 "", + pMsg->vgId, pMsg->prevLogIndex + 1, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex); // accept if (syncLogBufferAccept(ths->pLogBuf, ths, pEntry, pMsg->prevLogTerm) < 0) { goto _SEND_RESPONSE; } + accepted = true; _SEND_RESPONSE: pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths); bool matched = (pReply->matchIndex >= pReply->lastSendIndex); - pReply->success = matched; - if (matched) { + if (accepted && matched) { + pReply->success = true; // update commit index only after matching (void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); } diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 86d8ec11b9..ef20153fa7 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -96,8 +96,8 @@ int64_t syncNodeCheckCommitIndex(SSyncNode* ths, SyncIndex indexLikely) { if (indexLikely > ths->commitIndex && syncNodeAgreedUpon(ths, indexLikely)) { SyncIndex commitIndex = indexLikely; syncNodeUpdateCommitIndex(ths, commitIndex); - sInfo("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state, - ths->pRaftStore->currentTerm, commitIndex); + sDebug("vgId:%d, agreed upon. role:%d, term:%" PRId64 ", index: %" PRId64 "", ths->vgId, ths->state, + ths->pRaftStore->currentTerm, commitIndex); } return ths->commitIndex; } @@ -140,7 +140,7 @@ int32_t syncLogBufferReplicateOneTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, Syn SyncTerm prevLogTerm = -1; SSyncLogBuffer* pBuf = pNode->pLogBuf; - sInfo("vgId:%d, replicate one msg index: %" PRId64 " to dest: 0x%016" PRIx64, pNode->vgId, index, pDestId->addr); + sDebug("vgId:%d, replicate one msg index: %" PRId64 " to dest: 0x%016" PRIx64, pNode->vgId, index, pDestId->addr); pEntry = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf); if (pEntry == NULL) { @@ -199,8 +199,8 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMs ASSERT(pMsg->term == ths->pRaftStore->currentTerm); - sInfo("vgId:%d received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "", - pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex); + sDebug("vgId:%d received append entries reply. srcId:0x%016" PRIx64 ", term:%" PRId64 ", matchIndex:%" PRId64 "", + pMsg->vgId, pMsg->srcId.addr, pMsg->term, pMsg->matchIndex); if (pMsg->success) { SyncIndex oldMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId)); @@ -216,7 +216,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMs // replicate log SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId); - ASSERT(pMgr != NULL); + // ASSERT(pMgr != NULL); if (pMgr != NULL) { (void)syncLogReplMgrProcessReply(pMgr, ths, pMsg); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index a96fa31f83..6ba1867635 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -51,93 +51,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { return; } - // update commit index - SyncIndex newCommitIndex = pSyncNode->commitIndex; - for (SyncIndex index = syncNodeGetLastIndex(pSyncNode); index > pSyncNode->commitIndex; --index) { - bool agree = syncAgree(pSyncNode, index); - - if (agree) { - // term - SSyncRaftEntry* pEntry = NULL; - SLRUCache* pCache = pSyncNode->pLogStore->pCache; - LRUHandle* h = taosLRUCacheLookup(pCache, &index, sizeof(index)); - if (h) { - pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); - } else { - int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry); - if (code != 0) { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "advance commit index error, read wal index:%" PRId64, index); - syncNodeErrorLog(pSyncNode, logBuf); - return; - } - } - // cannot commit, even if quorum agree. need check term! - if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) { - // update commit index - newCommitIndex = index; - - if (h) { - taosLRUCacheRelease(pCache, h, false); - } else { - syncEntryDestroy(pEntry); - } - - break; - } else { - do { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "can not commit due to term not equal, index:%" PRId64 ", term:%" PRIu64, - pEntry->index, pEntry->term); - syncNodeEventLog(pSyncNode, logBuf); - } while (0); - } - - if (h) { - taosLRUCacheRelease(pCache, h, false); - } else { - syncEntryDestroy(pEntry); - } - } - } - - // advance commit index as large as possible - SyncIndex walCommitVer = logStoreWalCommitVer(pSyncNode->pLogStore); - if (walCommitVer > newCommitIndex) { - newCommitIndex = walCommitVer; - } - - // maybe execute fsm - if (newCommitIndex > pSyncNode->commitIndex) { - SyncIndex beginIndex = pSyncNode->commitIndex + 1; - SyncIndex endIndex = newCommitIndex; - - // update commit index - pSyncNode->commitIndex = newCommitIndex; - - // call back Wal - pSyncNode->pLogStore->syncLogUpdateCommitIndex(pSyncNode->pLogStore, pSyncNode->commitIndex); - - // execute fsm - if (pSyncNode->pFsm != NULL) { - int32_t code = syncNodeDoCommit(pSyncNode, beginIndex, endIndex, pSyncNode->state); - if (code != 0) { - char logBuf[128]; - snprintf(logBuf, sizeof(logBuf), "advance commit index error, do commit begin:%" PRId64 ", end:%" PRId64, - beginIndex, endIndex); - syncNodeErrorLog(pSyncNode, logBuf); - return; - } - } - } -} - -void syncMaybeAdvanceCommitIndexOld(SSyncNode* pSyncNode) { - if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { - syncNodeErrorLog(pSyncNode, "not leader, can not advance commit index"); - return; - } - // advance commit index to sanpshot first SSnapshot snapshot; pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index b428f4d2f2..a287727a39 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -111,4 +111,4 @@ int32_t syncNodeSendRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, syncRequestVote2RpcMsg(pMsg, &rpcMsg); syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); return ret; -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index f9f6760e8c..aaed6a10d0 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1116,6 +1116,8 @@ int32_t syncLogResetLogReplMgr(SSyncLogReplMgr* pMgr) { return 0; } +_Atomic int64_t tsRetryCnt = 0; + int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { if (pMgr->endIndex <= pMgr->startIndex) { return 0; @@ -1147,6 +1149,7 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { pMgr->states[pos].timeMs = nowMs; pMgr->states[pos].acked = false; retried = true; + tsRetryCnt++; } ret = 0; @@ -1185,10 +1188,10 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod (void)syncLogResetLogReplMgr(pMgr); } + // send match index SyncIndex index = TMIN(pMsg->matchIndex, pNode->pLogBuf->matchIndex); bool barrier = false; ASSERT(index >= 0); - // send match index if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &destId, &barrier) < 0) { sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, terrstr(), index, destId.addr); @@ -1206,6 +1209,17 @@ int32_t syncLogReplMgrProcessReplyInRecoveryMode(SSyncLogReplMgr* pMgr, SSyncNod return 0; } +int32_t syncLogReplMgrProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) { + SSyncLogBuffer* pBuf = pNode->pLogBuf; + taosThreadMutexLock(&pBuf->mutex); + if (pMsg->startTime != pMgr->peerStartTime) { + syncLogResetLogReplMgr(pMgr); + pMgr->peerStartTime = pMsg->startTime; + } + taosThreadMutexUnlock(&pBuf->mutex); + return 0; +} + int32_t syncLogReplMgrProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) { SSyncLogBuffer* pBuf = pNode->pLogBuf; taosThreadMutexLock(&pBuf->mutex); @@ -1245,17 +1259,19 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode } SSyncLogBuffer* pBuf = pNode->pLogBuf; - sInfo("vgId:%d, attempted to probe the %d'th peer. pMgr(restored:%d): [%" PRId64 " %" PRId64 ", %" PRId64 + sInfo("vgId:%d, attempted to probe the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); return 0; } +_Atomic int64_t tsSendCnt = 0; + int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ASSERT(pMgr->restored); SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; - int32_t batchSize = TMAX(1, pMgr->size / 10); + int32_t batchSize = TMAX(1, pMgr->size / 20); int32_t count = 0; for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) { @@ -1278,16 +1294,17 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p pMgr->states[pos].acked = false; pMgr->endIndex = index + 1; + tsSendCnt++; if (barrier) { break; } } SSyncLogBuffer* pBuf = pNode->pLogBuf; - sInfo("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(restored:%d): [%" PRId64 " %" PRId64 - ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", - pNode->vgId, count, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, - pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + sDebug("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 + "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", + pNode->vgId, count, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, + pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); syncLogReplMgrRetryOnNeed(pMgr, pNode); return 0; } @@ -1704,18 +1721,13 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) { SyncIndex commitIndex = pSyncNode->pLogStore->syncLogCommitIndex(pSyncNode->pLogStore); SyncIndex endIndex = pSyncNode->pLogBuf->endIndex; + ASSERT(endIndex == lastVer + 1); commitIndex = TMAX(pSyncNode->commitIndex, commitIndex); if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, commitIndex) < 0) { return -1; } - if (endIndex <= lastVer) { - sError("vgId:%d, failed to load log entries into log buffers. commit index:%" PRId64 ", lastVer: %" PRId64 "", - pSyncNode->vgId, commitIndex, lastVer); - return -1; - } - return 0; } @@ -2722,9 +2734,8 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) { (void)syncLogBufferRollback(pBuf, pBuf->matchIndex + 1); - sInfo("vgId:%d, reset log buffer. start index: %" PRId64 ", commit index: %" PRId64 ", match Index: %" PRId64 - ", end index: %" PRId64 "", - pNode->vgId, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); + sInfo("vgId:%d, reset log buffer. pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, + pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pBuf->endIndex = pBuf->matchIndex + 1; @@ -3380,10 +3391,10 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { // proceed match index, with replicating on needed SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths); - sInfo("vgId:%d, append raft log index: %" PRId64 ", term: %" PRId64 " log buffer: [%" PRId64 " %" PRId64 " %" PRId64 - ", %" PRId64 ")", - ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, - ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex); + sDebug("vgId:%d, append raft entry. index: %" PRId64 ", term: %" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64 + ", %" PRId64 ")", + ths->vgId, pEntry->index, pEntry->term, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, + ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex); // multi replica if (ths->replicaNum > 1) { @@ -3521,6 +3532,15 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg) { } int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg) { + SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId); + if (pMgr == NULL) { + sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr); + return -1; + } + return syncLogReplMgrProcessHeartbeatReply(pMgr, ths, pMsg); +} + +int32_t syncNodeOnHeartbeatReplyOld(SSyncNode* ths, SyncHeartbeatReply* pMsg) { syncLogRecvHeartbeatReply(ths, pMsg, ""); // update last reply time, make decision whether the other node is alive or not diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 3dcd2d8cdf..1c4b997875 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -173,8 +173,8 @@ int32_t syncNodeReplicateOld(SSyncNode* pSyncNode) { } int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, SRaftId* destRaftId, SyncAppendEntries* pMsg) { - sInfo("vgId:%d, send append entries msg index: %" PRId64 " to dest: 0x%016" PRId64, pSyncNode->vgId, - pMsg->prevLogIndex + 1, destRaftId->addr); + sTrace("vgId:%d, send append entries msg index: %" PRId64 " to dest: 0x%016" PRId64, pSyncNode->vgId, + pMsg->prevLogIndex + 1, destRaftId->addr); int32_t ret = 0; pMsg->destId = *destRaftId; SRpcMsg rpcMsg; diff --git a/source/libs/sync/src/syncTimeout.c b/source/libs/sync/src/syncTimeout.c index 17c8c14136..3fe7f08816 100644 --- a/source/libs/sync/src/syncTimeout.c +++ b/source/libs/sync/src/syncTimeout.c @@ -135,4 +135,4 @@ int32_t syncNodeOnTimer(SSyncNode* ths, SyncTimeout* pMsg) { } return ret; -} \ No newline at end of file +}