diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 076526e6c9..bd44718c58 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -225,7 +225,8 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncInd int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag); -bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId); +bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId); +SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId); // for debug -------------- void syncNodePrint(SSyncNode* pObj); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 41bf7ac778..2fc1cc07ed 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -549,6 +549,15 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntries, term:%lu", ths->pRaftStore->currentTerm); syncAppendEntriesLog2(logBuf, pMsg); + // if I am standby, be added into a raft group, I should process SyncAppendEntries msg + /* + // if already drop replica, do not process + if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { + sInfo("recv SyncAppendEntries maybe replica already dropped"); + return ret; + } + */ + // maybe update term if (pMsg->term > ths->pRaftStore->currentTerm) { syncNodeUpdateTerm(ths, pMsg->term); @@ -598,7 +607,36 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs return ret; } - // case 3, accept request + // case 3, index in my snapshot + if (pMsg->term == ths->pRaftStore->currentTerm && syncNodeHasSnapshot(ths)) { + SSnapshot snapshot; + ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + if (pMsg->prevLogIndex < snapshot.lastApplyIndex) { + sTrace( + "recv SyncAppendEntries, accept, in snapshot, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d, " + "snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", + pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, snapshot.lastApplyIndex, snapshot.lastApplyTerm); + + // prepare response msg + SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); + pReply->srcId = ths->myRaftId; + pReply->destId = pMsg->srcId; + pReply->term = ths->pRaftStore->currentTerm; + pReply->success = true; + pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; + pReply->matchIndex = snapshot.lastApplyIndex - 1; + + // send response + SRpcMsg rpcMsg; + syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); + syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); + syncAppendEntriesReplyDestroy(pReply); + + return ret; + } + } + + // case 4, accept request if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) { // has extra entries (> preIndex) in local log SyncIndex myLastIndex = syncNodeGetLastIndex(ths); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 94406c458e..16b0f79e3d 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -106,7 +106,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries // if already drop replica, do not process if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { - sInfo("maybe already dropped"); + sInfo("recv SyncAppendEntriesReply, maybe replica already dropped"); return ret; } @@ -195,35 +195,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries sentryIndex = pSender->snapshot.lastApplyIndex; } } -#if 0 - SyncIndex sentryIndex; - if (pSender->start && pSender->term == ths->pRaftStore->currentTerm) { - // already start - sentryIndex = pSender->snapshot.lastApplyIndex; - sTrace("sending snapshot already start: pSender->term:%lu, ths->pRaftStore->currentTerm:%lu", pSender->term, - ths->pRaftStore->currentTerm); - - } else { - if (pMsg->privateTerm == pSender->privateTerm) { - sTrace("same privateTerm, pMsg->privateTerm:%lu, pSender->privateTerm:%lu, do not start snapshot again", - pMsg->privateTerm, pSender->privateTerm); - } else { - // start send snapshot, first time - sTrace( - "sending snapshot start first: pSender->term:%lu, ths->pRaftStore->currentTerm:%lu, " - "pMsg->privateTerm:%lu, pSender->privateTerm:%lu", - pSender->term, ths->pRaftStore->currentTerm, pMsg->privateTerm, pSender->privateTerm); - - snapshotSenderDoStart(pSender); - pSender->start = true; - - // update snapshot private term - syncIndexMgrSetTerm(ths->pNextIndex, &(pMsg->srcId), pSender->privateTerm); - } - - sentryIndex = pSender->snapshot.lastApplyIndex; - } -#endif // update nextIndex to sentryIndex + 1 if (nextIndex <= sentryIndex) { diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index a64ca36c72..c092b31adf 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -96,116 +96,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { if (pSyncNode->pFsm != NULL) { int32_t code = syncNodeCommit(pSyncNode, beginIndex, endIndex, pSyncNode->state); ASSERT(code == 0); - -#if 0 - for (SyncIndex i = beginIndex; i <= endIndex; ++i) { - if (i != SYNC_INDEX_INVALID) { - SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, i); - assert(pEntry != NULL); - - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pEntry, &rpcMsg); - - if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { - SFsmCbMeta cbMeta; - cbMeta.index = pEntry->index; - cbMeta.isWeak = pEntry->isWeak; - cbMeta.code = 0; - cbMeta.state = pSyncNode->state; - cbMeta.seqNum = pEntry->seqNum; - cbMeta.term = pEntry->term; - cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; - cbMeta.flag = 0x1; - - SSnapshot snapshot; - ASSERT(pSyncNode->pFsm->FpGetSnapshot != NULL); - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); - - bool needExecute = true; - if (cbMeta.index <= snapshot.lastApplyIndex) { - needExecute = false; - } - - if (needExecute) { - pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta); - } - } - - // config change - if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) { - SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg; - - SSyncCfg newSyncCfg; - int32_t ret = syncCfgFromStr(rpcMsg.pCont, &newSyncCfg); - ASSERT(ret == 0); - - // update new config myIndex - bool hit = false; - for (int i = 0; i < newSyncCfg.replicaNum; ++i) { - if (strcmp(pSyncNode->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 && - pSyncNode->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) { - newSyncCfg.myIndex = i; - hit = true; - break; - } - } - - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - ASSERT(hit == true); - } - - bool isDrop; - syncNodeUpdateConfig(pSyncNode, &newSyncCfg, &isDrop); - - // change isStandBy to normal - if (!isDrop) { - if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { - syncNodeBecomeLeader(pSyncNode); - } else { - syncNodeBecomeFollower(pSyncNode); - } - } - - char* sOld = syncCfg2Str(&oldSyncCfg); - char* sNew = syncCfg2Str(&newSyncCfg); - sInfo("==config change== 0x1 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop); - taosMemoryFree(sOld); - taosMemoryFree(sNew); - - if (pSyncNode->pFsm->FpReConfigCb != NULL) { - SReConfigCbMeta cbMeta = {0}; - cbMeta.code = 0; - cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; - cbMeta.index = pEntry->index; - cbMeta.term = pEntry->term; - cbMeta.oldCfg = oldSyncCfg; - cbMeta.flag = 0x1; - cbMeta.isDrop = isDrop; - pSyncNode->pFsm->FpReConfigCb(pSyncNode->pFsm, newSyncCfg, cbMeta); - } - } - - // restore finish - if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) { - if (pSyncNode->restoreFinish == false) { - if (pSyncNode->pFsm->FpRestoreFinishCb != NULL) { - pSyncNode->pFsm->FpRestoreFinishCb(pSyncNode->pFsm); - } - pSyncNode->restoreFinish = true; - sInfo("==syncMaybeAdvanceCommitIndex== restoreFinish set true %p vgId:%d", pSyncNode, pSyncNode->vgId); - - /* - tsem_post(&pSyncNode->restoreSem); - sInfo("==syncMaybeAdvanceCommitIndex== RestoreFinish tsem_post %p", pSyncNode); - */ - } - } - - rpcFreeCont(rpcMsg.pCont); - syncEntryDestory(pEntry); - } - } -#endif } } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 80a3fb478e..8db4d6bd11 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -835,6 +835,10 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); if (pSyncNode->FpSendMsg != NULL) { + char logBuf[128] = {0}; + snprintf(logBuf, sizeof(logBuf), "==syncNodeSendMsgById== msgType:%d", pMsg->msgType); + syncRpcMsgLog2(logBuf, pMsg); + // htonl syncUtilMsgHtoN(pMsg->pCont); @@ -1817,4 +1821,14 @@ bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId) { } } return false; +} + +SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId) { + SSyncSnapshotSender* pSender = NULL; + for (int i = 0; i < ths->replicaNum; ++i) { + if (syncUtilSameId(pDestId, &((ths->replicasId)[i]))) { + pSender = (ths->senders)[i]; + } + } + return pSender; } \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index ff70202f87..d55f0b80d5 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -44,6 +44,8 @@ static SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); // refactor, log[0 .. n] ==> log[m .. n] static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex) { + sTrace("raftLogSetBeginIndex beginIndex:%ld", beginIndex); + // if beginIndex == 0, donot need call this funciton ASSERT(beginIndex > 0); diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 4b2d444596..d939244906 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -103,6 +103,12 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { snprintf(logBuf, sizeof(logBuf), "recv SyncRequestVote, currentTerm:%lu", ths->pRaftStore->currentTerm); syncRequestVoteLog2(logBuf, pMsg); + // if already drop replica, do not process + if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { + sInfo("recv SyncRequestVote maybe replica already dropped"); + return ret; + } + // maybe update term if (pMsg->term > ths->pRaftStore->currentTerm) { syncNodeUpdateTerm(ths, pMsg->term); diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index d2009fb355..9cc905d847 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -101,6 +101,12 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl snprintf(logBuf, sizeof(logBuf), "recv SyncRequestVoteReply, term:%lu", ths->pRaftStore->currentTerm); syncRequestVoteReplyLog2(logBuf, pMsg); + // if already drop replica, do not process + if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { + sInfo("recv SyncRequestVoteReply, maybe replica already dropped"); + return ret; + } + // drop stale response if (pMsg->term < ths->pRaftStore->currentTerm) { sTrace("recv SyncRequestVoteReply, drop stale response, receive_term:%lu current_term:%lu", pMsg->term, diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 5715354de0..021306397b 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -438,7 +438,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { needRsp = true; char *msgStr = syncSnapshotSend2Str(pMsg); - sTrace("snapshot recv begin ack:%d recv msg:%s", pReceiver->ack, msgStr); + sTrace("snapshot recv begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); taosMemoryFree(msgStr); } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { @@ -448,7 +448,9 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1); char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore); - sInfo("snapshot receive finish, update log begin index:%ld, raft log:%s", pMsg->lastIndex + 1, logSimpleStr); + SSnapshot snapshot; + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + sInfo("snapshot recv finish, update log begin index:%ld, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu, raft log:%s", pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logSimpleStr); taosMemoryFree(logSimpleStr); // walRestoreFromSnapshot(pSyncNode->pWal, pMsg->lastIndex); @@ -460,7 +462,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { needRsp = true; char *msgStr = syncSnapshotSend2Str(pMsg); - sTrace("snapshot recv end ack:%d recv msg:%s", pReceiver->ack, msgStr); + sTrace("snapshot recv end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); taosMemoryFree(msgStr); } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { @@ -469,7 +471,9 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { needRsp = false; char *msgStr = syncSnapshotSend2Str(pMsg); - sTrace("snapshot recv force close ack:%d recv msg:%s", pReceiver->ack, msgStr); + sTrace("snapshot recv force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); + + taosMemoryFree(msgStr); } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { @@ -481,7 +485,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { needRsp = true; char *msgStr = syncSnapshotSend2Str(pMsg); - sTrace("snapshot recv receiving ack:%d recv msg:%s", pReceiver->ack, msgStr); + sTrace("snapshot recv receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr); taosMemoryFree(msgStr); } else {