From 757a1248dc0e6e3b8d1ca6195d85d4aba8a42fb1 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 1 Jun 2022 21:23:39 +0800 Subject: [PATCH] fix: send snapshot --- source/libs/sync/inc/syncInt.h | 1 + source/libs/sync/src/syncIO.c | 17 ++++++++++ source/libs/sync/src/syncMain.c | 10 +++++- source/libs/sync/src/syncMessage.c | 10 ++++++ source/libs/sync/src/syncReplication.c | 8 ++--- source/libs/sync/src/syncSnapshot.c | 8 +++-- .../test/syncConfigChangeSnapshotTest.cpp | 31 +++++++++++++------ 7 files changed, 68 insertions(+), 17 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 22aa7b1224..3f5c081f4a 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -157,6 +157,7 @@ typedef struct SSyncNode { // SSnapshot* pSnapshot; SSyncSnapshotSender* senders[TSDB_MAX_REPLICA]; SSyncSnapshotReceiver* receivers[TSDB_MAX_REPLICA]; + SSyncSnapshotReceiver* pNewNodeReceiver; } SSyncNode; diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index e30a39e634..61d801dcbf 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -319,6 +319,23 @@ static void *syncIOConsumerFunc(void *param) { io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg); syncTimeoutDestroy(pSyncMsg); } + + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_SEND) { + if (io->FpOnSyncSnapshotSend != NULL) { + SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + io->FpOnSyncSnapshotSend(io->pSyncNode, pSyncMsg); + syncSnapshotSendDestroy(pSyncMsg); + } + + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_RSP) { + if (io->FpOnSyncSnapshotRsp != NULL) { + SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pRpcMsg); + assert(pSyncMsg != NULL); + io->FpOnSyncSnapshotRsp(io->pSyncNode, pSyncMsg); + syncSnapshotRspDestroy(pSyncMsg); + } + } else { sTrace("unknown msgType:%d, no operator", pRpcMsg->msgType); } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 219aecd22d..7878bfe7ee 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -601,6 +601,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { (pSyncNode->receivers)[i] = pReceiver; } + pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, 100); + // start in syncNodeStart // start raft // syncNodeBecomeFollower(pSyncNode); @@ -611,6 +613,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { void syncNodeStart(SSyncNode* pSyncNode) { // start raft if (pSyncNode->replicaNum == 1) { + raftStoreNextTerm(pSyncNode->pRaftStore); syncNodeBecomeLeader(pSyncNode); syncNodeLog2("==state change become leader immediately==", pSyncNode); @@ -706,6 +709,11 @@ void syncNodeClose(SSyncNode* pSyncNode) { } } + if (pSyncNode->pNewNodeReceiver != NULL) { + snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver); + pSyncNode->pNewNodeReceiver = NULL; + } + /* if (pSyncNode->pSnapshot != NULL) { taosMemoryFree(pSyncNode->pSnapshot); @@ -1294,7 +1302,7 @@ int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, Sy // get pre index and term of "index" int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) { ASSERT(index >= SYNC_INDEX_BEGIN); - ASSERT(!syncNodeIsIndexInSnapshot(pSyncNode, index)); + // ASSERT(!syncNodeIsIndexInSnapshot(pSyncNode, index)); int ret = 0; SyncIndex preIndex = index - 1; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 7314744e3b..e8fa146271 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -65,6 +65,16 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { pRoot = syncAppendEntriesReply2Json(pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_SEND) { + SyncSnapshotSend* pSyncMsg = syncSnapshotSendDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + pRoot = syncSnapshotSend2Json(pSyncMsg); + syncSnapshotSendDestroy(pSyncMsg); + + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_SNAPSHOT_RSP) { + SyncSnapshotRsp* pSyncMsg = syncSnapshotRspDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); + pRoot = syncSnapshotRsp2Json(pSyncMsg); + syncSnapshotRspDestroy(pSyncMsg); + } else if (pRpcMsg->msgType == TDMT_VND_SYNC_COMMON_RESPONSE) { pRoot = cJSON_CreateObject(); char* s; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 0746f0ed6e..b645fa9ac8 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -61,6 +61,7 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { // set prevLogIndex SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); + SyncIndex preLogIndex = nextIndex - 1; // set preLogTerm @@ -127,13 +128,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { SRaftId* pDestId = &(pSyncNode->peersId[i]); SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); - SyncIndex preLogIndex; SyncTerm preLogTerm; - ret = syncNodeGetPreIndexTerm(pSyncNode, nextIndex, &preLogIndex, &preLogTerm); - ASSERT(ret == 0); - // batch optimized // SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex); @@ -181,6 +178,9 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { snapshotSenderStart(pSender); } else { + ret = syncNodeGetPreIndexTerm(pSyncNode, nextIndex, &preLogIndex, &preLogTerm); + ASSERT(ret == 0); + SyncAppendEntries* pMsg = NULL; SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, nextIndex); if (pEntry != NULL) { diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 7521416379..cfd5e2f5da 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -407,7 +407,11 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { pReceiver = (pSyncNode->receivers)[i]; } } - ASSERT(pReceiver != NULL); + + // add new replica + if (pReceiver == NULL) { + pReceiver = pSyncNode->pNewNodeReceiver; + } bool needRsp = false; @@ -430,7 +434,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true); snapshotReceiverStop(pReceiver); pReceiver->ack = pMsg->seq; - needRsp = true; + needRsp = false; char *msgStr = syncSnapshotSend2Str(pMsg); sTrace("snapshot recv end ack:%d recv msg:%s", pReceiver->ack, msgStr); diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index c7fcd9f0d4..8f36f19238 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -20,6 +20,7 @@ uint16_t gPorts[] = {7010, 7110, 7210, 7310, 7410}; const char* gDir = "./syncReplicateTest"; int32_t gVgId = 1234; SyncIndex gSnapshotLastApplyIndex; +SyncIndex gSnapshotLastApplyTerm; void init() { int code = walInit(); @@ -44,8 +45,9 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { if (cbMeta.index > beginIndex) { char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), - "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", pFsm, - cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); + "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, flag:%lu, term:%lu \n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), + cbMeta.flag, cbMeta.term); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } else { sTrace("==callback== ==CommitCb== do not apply again %ld", cbMeta.index); @@ -71,7 +73,7 @@ void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { pSnapshot->data = NULL; pSnapshot->lastApplyIndex = gSnapshotLastApplyIndex; - pSnapshot->lastApplyTerm = 100; + pSnapshot->lastApplyTerm = gSnapshotLastApplyTerm; return 0; } @@ -94,17 +96,18 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32 static int readIter = 0; if (readIter == 5) { + *len = 0; + *ppBuf = NULL; + } else if (readIter < 5) { *len = 20; *ppBuf = taosMemoryMalloc(*len); snprintf((char*)*ppBuf, *len, "data iter:%d", readIter); - } else { - *len = 0; - *ppBuf = NULL; } char logBuf[256] = {0}; - snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotDoRead== pFsm:%p, pReader:%p, *len:%d *ppBuf:%s", pFsm, - pReader, *len, (char*)(*ppBuf)); + snprintf(logBuf, sizeof(logBuf), + "==callback== ==SnapshotDoRead== pFsm:%p, pReader:%p, *len:%d, *ppBuf:%s, readIter:%d", pFsm, pReader, *len, + (char*)(*ppBuf), readIter); sTrace("%s", logBuf); readIter++; @@ -249,7 +252,7 @@ void configChange(int64_t rid, int32_t replicaNum, int32_t myIndex) { } void usage(char* exe) { - printf("usage: %s replicaNum myIndex lastApplyIndex writeRecordNum isStandBy isConfigChange \n", exe); + printf("usage: %s replicaNum myIndex lastApplyIndex writeRecordNum isStandBy isConfigChange lastApplyTerm \n", exe); } SRpcMsg* createRpcMsg(int i, int count, int myIndex) { @@ -265,7 +268,7 @@ SRpcMsg* createRpcMsg(int i, int count, int myIndex) { int main(int argc, char** argv) { tsAsyncLog = 0; sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE + DEBUG_INFO; - if (argc != 7) { + if (argc != 8) { usage(argv[0]); exit(-1); } @@ -276,7 +279,15 @@ int main(int argc, char** argv) { int32_t writeRecordNum = atoi(argv[4]); bool isStandBy = atoi(argv[5]); bool isConfigChange = atoi(argv[6]); + int32_t lastApplyTerm = atoi(argv[7]); + + sTrace( + "args: replicaNum:%d, myIndex:%d, lastApplyIndex:%d, writeRecordNum:%d, isStandBy:%d, isConfigChange:%d, " + "lastApplyTerm:%d", + replicaNum, myIndex, lastApplyIndex, writeRecordNum, isStandBy, isConfigChange, lastApplyTerm); + gSnapshotLastApplyIndex = lastApplyIndex; + gSnapshotLastApplyTerm = lastApplyTerm; if (!isStandBy) { assert(replicaNum >= 1 && replicaNum <= 5);