diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 0b997690a1..3a808ac6f3 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -38,6 +38,7 @@ extern "C" { #define SYNC_MNODE_LOG_RETENTION 10000 #define SYNC_VNODE_LOG_RETENTION 100 #define SNAPSHOT_MAX_CLOCK_SKEW_MS 1000 * 10 +#define SNAPSHOT_WAIT_MS 1000 * 30 #define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000 diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 8232b35284..2a3c07c2ef 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -282,8 +282,9 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId); void syncNodeVoteForSelf(SSyncNode* pSyncNode); // snapshot -------------- -bool syncNodeHasSnapshot(SSyncNode* pSyncNode); -void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode); +bool syncNodeHasSnapshot(SSyncNode* pSyncNode); +void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode); +int32_t syncNodeStartSnapshot(SSyncNode* pSyncNode, SRaftId* pDestId); SyncIndex syncNodeGetLastIndex(const SSyncNode* pSyncNode); SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode); diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 0ce6c404eb..7da610a9ed 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -51,7 +51,7 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode); int32_t syncNodeSendHeartbeat(SSyncNode* pSyncNode, const SRaftId* pDestId, SRpcMsg* pMsg); int32_t syncNodeReplicate(SSyncNode* pSyncNode); -int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId); +int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 963fedce31..93b2531235 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -43,6 +43,7 @@ typedef struct SSyncSnapshotSender { int64_t sendingMS; SyncTerm term; int64_t startTime; + int64_t endTime; bool finish; // init when create @@ -59,14 +60,17 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender); typedef struct SSyncSnapshotReceiver { - bool start; - int32_t ack; + // update when pre snapshot + bool start; + int32_t ack; + SyncTerm term; + SRaftId fromId; + int64_t startTime; + + // update when begin void *pWriter; - SyncTerm term; SSnapshotParam snapshotParam; SSnapshot snapshot; - SRaftId fromId; - int64_t startTime; // init when create SSyncNode *pSyncNode; @@ -82,7 +86,8 @@ void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceive // on message int32_t syncNodeOnSnapshot(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshotReply(SSyncNode *ths, SyncSnapshotRsp *pMsg); -int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId); + +// start #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index ce6aad1983..0b6119457a 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -185,7 +185,11 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { if (pMsg->prevLogIndex >= startIndex) { SyncTerm myPreLogTerm = syncNodeGetPreTerm(ths, pMsg->prevLogIndex + 1); - ASSERT(myPreLogTerm != SYNC_TERM_INVALID); + // ASSERT(myPreLogTerm != SYNC_TERM_INVALID); + if (myPreLogTerm == SYNC_TERM_INVALID) { + syncLogRecvAppendEntries(ths, pMsg, "reject, pre-term invalid"); + goto _SEND_RESPONSE; + } if (myPreLogTerm != pMsg->prevLogTerm) { syncLogRecvAppendEntries(ths, pMsg, "reject, pre-term not match"); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index a6a9389ca6..6b2bf250ad 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -119,7 +119,7 @@ int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ASSERT(pState != NULL); if (pMsg->lastSendIndex == pState->lastSendIndex) { - syncNodeReplicateOne(ths, &(pMsg->srcId)); + syncNodeReplicateOne(ths, &(pMsg->srcId), true); } } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index bd7b119105..0952f87ed7 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2604,10 +2604,47 @@ void syncLogRecvSyncPreSnapshotReply(SSyncNode* pSyncNode, const SyncPreSnapshot port, pMsg->term, pMsg->snapStart, s); } -void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {} +void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); -void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {} + sNTrace(pSyncNode, + "send sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", end:%" PRId64 ", lterm:%" PRId64 + ", stime:%" PRId64 ", seq:%d}, %s", + host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, s); +} -void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {} +void syncLogRecvSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); -void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) {} + sNTrace(pSyncNode, + "recv sync-snapshot-send from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 + ", stime:%" PRId64 ", seq:%d, len:%u}, %s", + host, port, pMsg->term, pMsg->beginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->seq, + pMsg->dataLen, s); +} + +void syncLogSendSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "send sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 + ", stime:%" PRId64 ", ack:%d}, %s", + host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s); +} + +void syncLogRecvSyncSnapshotRsp(SSyncNode* pSyncNode, const SyncSnapshotRsp* pMsg, const char* s) { + char host[64]; + uint16_t port; + syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); + + sNTrace(pSyncNode, + "recv sync-snapshot-rsp from %s:%d {term:%" PRId64 ", begin:%" PRId64 ", lst:%" PRId64 ", lterm:%" PRId64 + ", stime:%" PRId64 ", ack:%d}, %s", + host, port, pMsg->term, pMsg->snapBeginIndex, pMsg->lastIndex, pMsg->lastTerm, pMsg->startTime, pMsg->ack, s); +} diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index f6ed76bbc8..0f2057210c 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -48,19 +48,20 @@ static int32_t syncNodeMaybeSendAppendEntries(SSyncNode* pNode, const SRaftId* d // mdest |-> j]) // /\ UNCHANGED <> -int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId) { +int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapshot) { // next index SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); - // maybe start snapshot - SyncIndex logStartIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); - SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore); - if (nextIndex < logStartIndex || nextIndex - 1 > logEndIndex) { - sNTrace(pSyncNode, "maybe start snapshot for next-index:%" PRId64 ", start:%" PRId64 ", end:%" PRId64, nextIndex, - logStartIndex, logEndIndex); - // start snapshot - // int32_t code = syncNodeStartSnapshot(pSyncNode, pDestId); - return 0; + if (snapshot) { + // maybe start snapshot + SyncIndex logStartIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); + SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore); + if (nextIndex < logStartIndex || nextIndex - 1 > logEndIndex) { + sNTrace(pSyncNode, "maybe start snapshot for next-index:%" PRId64 ", start:%" PRId64 ", end:%" PRId64, nextIndex, + logStartIndex, logEndIndex); + // start snapshot + int32_t code = syncNodeStartSnapshot(pSyncNode, pDestId); + } } // pre index, pre term @@ -124,7 +125,7 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) { int32_t ret = 0; for (int i = 0; i < pSyncNode->peersNum; ++i) { SRaftId* pDestId = &(pSyncNode->peersId[i]); - ret = syncNodeReplicateOne(pSyncNode, pDestId); + ret = syncNodeReplicateOne(pSyncNode, pDestId, true); if (ret != 0) { char host[64]; int16_t port; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 3273718f7d..41dc3d3c39 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -19,6 +19,7 @@ #include "syncRaftCfg.h" #include "syncRaftLog.h" #include "syncRaftStore.h" +#include "syncReplication.h" #include "syncUtil.h" SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { @@ -44,6 +45,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->replicaIndex = replicaIndex; pSender->term = pSyncNode->pRaftStore->currentTerm; pSender->startTime = 0; + pSender->endTime = 0; pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->finish = false; } else { @@ -119,6 +121,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { SRpcMsg rpcMsg; syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); + syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); syncSnapshotSendDestroy(pMsg); // event log @@ -130,6 +133,7 @@ int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { // update flag pSender->start = false; pSender->finish = finish; + pSender->endTime = taosGetTimestampMs(); // close reader if (pSender->pReader != NULL) { @@ -191,6 +195,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { SRpcMsg rpcMsg; syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); + syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); syncSnapshotSendDestroy(pMsg); // event log @@ -226,6 +231,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { SRpcMsg rpcMsg; syncSnapshotSend2RpcMsg(pMsg, &rpcMsg); syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg); + syncLogSendSyncSnapshotSend(pSender->pSyncNode, pMsg, ""); syncSnapshotSendDestroy(pMsg); // event log @@ -241,6 +247,9 @@ static void snapshotSenderUpdateProgress(SSyncSnapshotSender *pSender, SyncSnaps ++(pSender->seq); } +// return 0, start ok +// return 1, last snapshot finish ok +// return -1, error int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { sNTrace(pSyncNode, "starting snapshot ..."); @@ -253,11 +262,14 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { int32_t code = 0; if (snapshotSenderIsStart(pSender)) { - code = snapshotSenderStop(pSender, false); - if (code != 0) { - sNError(pSyncNode, "snapshot sender stop error"); - return -1; - } + sNTrace(pSyncNode, "snapshot sender already start, ignore"); + return 0; + } + + if (!snapshotSenderIsStart(pSender) && pSender->finish && + taosGetTimestampMs() - pSender->endTime < SNAPSHOT_WAIT_MS) { + sNTrace(pSyncNode, "snapshot sender too frequently, ignore"); + return 1; } code = snapshotSenderStart(pSender); @@ -316,36 +328,6 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; } -// static do start by privateTerm, pBeginMsg -// receive first snapshot data -// write first block data -static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) { - pReceiver->start = true; - pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; - - // start writer - ASSERT(pReceiver->pWriter == NULL); - int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, - &(pReceiver->snapshotParam), &(pReceiver->pWriter)); - ASSERT(ret == 0); - - pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm; - pReceiver->snapshotParam.start = pBeginMsg->beginIndex; - pReceiver->snapshotParam.end = pBeginMsg->lastIndex; - - pReceiver->fromId = pBeginMsg->srcId; - - // update snapshot - pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex; - pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm; - pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex; - - pReceiver->startTime = pBeginMsg->startTime; - - // event log - sRTrace(pReceiver, "snapshot receiver start"); -} - // force stop void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { // force close, abandon incomplete data @@ -362,10 +344,43 @@ void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { sRTrace(pReceiver, "snapshot receiver force stop"); } -// if receiver receive msg from seq = SYNC_SNAPSHOT_SEQ_BEGIN, start receiver -int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) { +int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) { + ASSERT(snapshotReceiverIsStart(pReceiver)); + + // update ack + pReceiver->ack = SYNC_SNAPSHOT_SEQ_BEGIN; + + // update snapshot + pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex; + pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm; + pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex; + + pReceiver->snapshotParam.start = pBeginMsg->beginIndex; + pReceiver->snapshotParam.end = pBeginMsg->lastIndex; + + // start writer + ASSERT(pReceiver->pWriter == NULL); + int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, + &(pReceiver->snapshotParam), &(pReceiver->pWriter)); + ASSERT(ret == 0); + + // event log + sRTrace(pReceiver, "snapshot receiver start writer"); + + return 0; +} + +int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pPreMsg) { ASSERT(!snapshotReceiverIsStart(pReceiver)); - snapshotReceiverDoStart(pReceiver, pBeginMsg); + + pReceiver->start = true; + pReceiver->ack = SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT; + pReceiver->term = pReceiver->pSyncNode->pRaftStore->currentTerm; + pReceiver->fromId = pPreMsg->srcId; + pReceiver->startTime = pPreMsg->startTime; + + // event log + sRTrace(pReceiver, "snapshot receiver start"); return 0; } @@ -518,7 +533,10 @@ _START_RECEIVER: return -1; } else { // waiting for clock match - while (taosGetTimestampMs() > pMsg->startTime) { + int64_t timeNow = taosGetTimestampMs(); + while (timeNow < pMsg->startTime) { + sNTrace(pSyncNode, "snapshot receiver pre waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow, + pMsg->startTime); taosMsleep(10); } @@ -543,6 +561,7 @@ _SEND_REPLY: SRpcMsg rpcMsg; syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); + syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, ""); syncSnapshotRspDestroy(pRspMsg); return 0; @@ -552,54 +571,119 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p // condition 1 SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; - if (snapshotReceiverIsStart(pReceiver)) { - if (pMsg->startTime > pReceiver->startTime) { - snapshotReceiverStop(pReceiver); - - } else if (pMsg->startTime == pReceiver->startTime) { - return 0; - } else { - // ignore - sNTrace(pSyncNode, "msg ignore"); - return 0; - } - } - -_START_RECEIVER: - if (taosGetTimestampMs() - pMsg->startTime > SNAPSHOT_MAX_CLOCK_SKEW_MS) { - sNError(pSyncNode, "snapshot receiver time skew too much"); + if (!snapshotReceiverIsStart(pReceiver)) { + sNError(pSyncNode, "snapshot receiver not start"); return -1; - } else { - // waiting for clock match - while (taosGetTimestampMs() > pMsg->startTime) { - taosMsleep(10); - } - - snapshotReceiverStart(pReceiver, pMsg); - - // build msg - SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); - pRspMsg->srcId = pSyncNode->myRaftId; - pRspMsg->destId = pMsg->srcId; - pRspMsg->term = pSyncNode->pRaftStore->currentTerm; - pRspMsg->lastIndex = pMsg->lastIndex; - pRspMsg->lastTerm = pMsg->lastTerm; - pRspMsg->ack = pReceiver->ack; // receiver maybe already closed - pRspMsg->code = 0; - - // send msg - SRpcMsg rpcMsg; - syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); - syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); - syncSnapshotRspDestroy(pRspMsg); } + if (pReceiver->startTime != pMsg->startTime) { + sNError(pSyncNode, "snapshot receiver time not equal"); + return -1; + } + + // start writer + snapshotReceiverStartWriter(pReceiver, pMsg); + + // build msg + SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); + pRspMsg->srcId = pSyncNode->myRaftId; + pRspMsg->destId = pMsg->srcId; + pRspMsg->term = pSyncNode->pRaftStore->currentTerm; + pRspMsg->lastIndex = pMsg->lastIndex; + pRspMsg->lastTerm = pMsg->lastTerm; + pRspMsg->startTime = pReceiver->startTime; + pRspMsg->ack = pReceiver->ack; // receiver maybe already closed + pRspMsg->code = 0; + pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; + + // send msg + SRpcMsg rpcMsg; + syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); + syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); + syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, ""); + syncSnapshotRspDestroy(pRspMsg); + return 0; } -static int32_t syncNodeOnSnapshotTransfer(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { return 0; } +static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { + // condition 4 + // transfering + SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; -static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { return 0; } + // waiting for clock match + int64_t timeNow = taosGetTimestampMs(); + while (timeNow < pMsg->startTime) { + sNTrace(pSyncNode, "snapshot receiver transfering waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow, + pMsg->startTime); + taosMsleep(10); + } + + if (pMsg->seq == pReceiver->ack + 1) { + snapshotReceiverGotData(pReceiver, pMsg); + } + + // build msg + SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); + pRspMsg->srcId = pSyncNode->myRaftId; + pRspMsg->destId = pMsg->srcId; + pRspMsg->term = pSyncNode->pRaftStore->currentTerm; + pRspMsg->lastIndex = pMsg->lastIndex; + pRspMsg->lastTerm = pMsg->lastTerm; + pRspMsg->startTime = pReceiver->startTime; + pRspMsg->ack = pReceiver->ack; // receiver maybe already closed + pRspMsg->code = 0; + pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; + + // send msg + SRpcMsg rpcMsg; + syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); + syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); + syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, ""); + syncSnapshotRspDestroy(pRspMsg); + + return 0; +} + +static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { + // condition 2 + // end, finish FSM + SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; + + // waiting for clock match + int64_t timeNow = taosGetTimestampMs(); + while (timeNow < pMsg->startTime) { + sNTrace(pSyncNode, "snapshot receiver finish waitting for true time, now:%" PRId64 ", stime:%" PRId64, timeNow, + pMsg->startTime); + taosMsleep(10); + } + + int32_t code = snapshotReceiverFinish(pReceiver, pMsg); + if (code == 0) { + snapshotReceiverStop(pReceiver); + } + + // build msg + SyncSnapshotRsp *pRspMsg = syncSnapshotRspBuild(pSyncNode->vgId); + pRspMsg->srcId = pSyncNode->myRaftId; + pRspMsg->destId = pMsg->srcId; + pRspMsg->term = pSyncNode->pRaftStore->currentTerm; + pRspMsg->lastIndex = pMsg->lastIndex; + pRspMsg->lastTerm = pMsg->lastTerm; + pRspMsg->startTime = pReceiver->startTime; + pRspMsg->ack = pReceiver->ack; // receiver maybe already closed + pRspMsg->code = 0; + pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; + + // send msg + SRpcMsg rpcMsg; + syncSnapshotRsp2RpcMsg(pRspMsg, &rpcMsg); + syncNodeSendMsgById(&(pRspMsg->destId), pSyncNode, &rpcMsg); + syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, ""); + syncSnapshotRspDestroy(pRspMsg); + + return 0; +} // receiver on message // @@ -641,6 +725,8 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { int32_t code = 0; SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; + syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, ""); + // state, term, seq/ack if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { @@ -651,39 +737,14 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { syncNodeOnSnapshotBegin(pSyncNode, pMsg); } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { - // condition 2 - // end, finish FSM - code = snapshotReceiverFinish(pReceiver, pMsg); - if (code == 0) { - snapshotReceiverStop(pReceiver); - } - bool needRsp = true; - - // maybe update lastconfig - if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) { - SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg; - - // update new config myIndex - SSyncCfg newSyncCfg = pMsg->lastConfig; - syncNodeUpdateNewConfigIndex(pSyncNode, &newSyncCfg); - - // do config change - syncNodeDoConfigChange(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex); - } + syncNodeOnSnapshotEnd(pSyncNode, pMsg); } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { - // condition 3 - // force close + // force close, no response snapshotReceiverForceStop(pReceiver); - bool needRsp = false; } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { - // condition 4 - // transfering - if (pMsg->seq == pReceiver->ack + 1) { - snapshotReceiverGotData(pReceiver, pMsg); - } - bool needRsp = true; + syncNodeOnSnapshotTransfering(pSyncNode, pMsg); } else { // error log @@ -717,11 +778,17 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) pSender->snapshotParam.start = pMsg->snapBeginIndex; pSender->snapshotParam.end = snapshot.lastApplyIndex; + sNTrace(pSyncNode, "prepare snapshot, recv-begin:%" PRId64 ", snapshot.last:%" PRId64 ", snapshot.term:%" PRId64, + pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm); + if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) { sNError(pSyncNode, "snapshot last index too small"); return -1; } + // update sender + pSender->snapshot = snapshot; + // start reader int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &(pSender->snapshotParam), &(pSender->pReader)); if (code != 0) { @@ -729,6 +796,12 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) return -1; } + // update next index + syncIndexMgrSetIndex(pSyncNode->pNextIndex, &(pMsg->srcId), snapshot.lastApplyIndex + 1); + + // update seq + pSender->seq = SYNC_SNAPSHOT_SEQ_BEGIN; + // build begin msg SyncSnapshotSend *pSendMsg = syncSnapshotSendBuild(0, pSender->pSyncNode->vgId); pSendMsg->srcId = pSender->pSyncNode->myRaftId; @@ -746,6 +819,7 @@ int32_t syncNodeOnSnapshotReplyPre(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) SRpcMsg rpcMsg; syncSnapshotSend2RpcMsg(pSendMsg, &rpcMsg); syncNodeSendMsgById(&(pSendMsg->destId), pSender->pSyncNode, &rpcMsg); + syncLogSendSyncSnapshotSend(pSyncNode, pSendMsg, ""); syncSnapshotSendDestroy(pSendMsg); return 0; @@ -773,6 +847,8 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { return -1; } + syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, ""); + // state, term, seq/ack if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pMsg->term == pSyncNode->pRaftStore->currentTerm) { @@ -782,9 +858,20 @@ int32_t syncNodeOnSnapshotReply(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { return 0; } + if (pMsg->ack == SYNC_SNAPSHOT_SEQ_BEGIN) { + snapshotSenderUpdateProgress(pSender, pMsg); + snapshotSend(pSender); + return 0; + } + // receive ack is finish, close sender if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { snapshotSenderStop(pSender, true); + + // update next-index + syncIndexMgrSetIndex(pSyncNode->pNextIndex, &(pMsg->srcId), pMsg->lastIndex + 1); + syncNodeReplicateOne(pSyncNode, &(pMsg->srcId), false); + return 0; } diff --git a/source/libs/sync/test/syncRespMgrTest.cpp b/source/libs/sync/test/syncRespMgrTest.cpp index 8d709e8c81..b609ef1a0e 100644 --- a/source/libs/sync/test/syncRespMgrTest.cpp +++ b/source/libs/sync/test/syncRespMgrTest.cpp @@ -60,13 +60,13 @@ void syncRespMgrGetTest(uint64_t i) { void syncRespMgrGetAndDelTest(uint64_t i) { printf("------syncRespMgrGetAndDelTest-------%" PRIu64 "-- \n", i); - // SRespStub stub; - // int32_t ret = syncRespMgrGetAndDel(pMgr, i, &stub); - // if (ret == 1) { - // printStub(&stub); - // } else if (ret == 0) { - // printf("%" PRId64 " notFound \n", i); - // } + SRpcHandleInfo stub; + int32_t ret = syncRespMgrGetAndDel(pMgr, i, &stub); + if (ret == 1) { + //printStub(&stub); + } else if (ret == 0) { + printf("%" PRId64 " notFound \n", i); + } } SSyncNode *createSyncNode() {