diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 9176d7b317..dcbb76fe49 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -398,6 +398,7 @@ typedef struct SyncSnapshotSend { SyncTerm term; SyncIndex lastIndex; // lastIndex of snapshot SyncTerm lastTerm; // lastTerm of snapshot + SyncTerm privateTerm; int32_t seq; uint32_t dataLen; char data[]; @@ -432,6 +433,7 @@ typedef struct SyncSnapshotRsp { SyncTerm term; SyncIndex lastIndex; SyncTerm lastTerm; + SyncTerm privateTerm; int32_t ack; } SyncSnapshotRsp; diff --git a/source/libs/sync/inc/syncIndexMgr.h b/source/libs/sync/inc/syncIndexMgr.h index c56dedb7ec..1983d407df 100644 --- a/source/libs/sync/inc/syncIndexMgr.h +++ b/source/libs/sync/inc/syncIndexMgr.h @@ -41,8 +41,8 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index); SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); -cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr); -char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr); +cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr); +char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr); void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term); SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId); diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index a297217b74..383f731ab9 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -39,8 +39,8 @@ typedef struct SSyncSnapshotSender { bool start; int32_t seq; int32_t ack; - void *pReader; - void *pCurrentBlock; + void * pReader; + void * pCurrentBlock; int32_t blockLen; SSnapshot snapshot; int64_t sendingMS; @@ -55,19 +55,20 @@ void snapshotSenderDoStart(SSyncSnapshotSender *pSender); SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex); void snapshotSenderDestroy(SSyncSnapshotSender *pSender); -void snapshotSenderStart(SSyncSnapshotSender *pSender); -void snapshotSenderStop(SSyncSnapshotSender *pSender); -int32_t snapshotSend(SSyncSnapshotSender *pSender); -int32_t snapshotReSend(SSyncSnapshotSender *pSender); -cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); -char *snapshotSender2Str(SSyncSnapshotSender *pSender); +// void snapshotSenderStart(SSyncSnapshotSender *pSender); +void snapshotSenderStop(SSyncSnapshotSender *pSender); +int32_t snapshotSend(SSyncSnapshotSender *pSender); +int32_t snapshotReSend(SSyncSnapshotSender *pSender); +cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender); +char * snapshotSender2Str(SSyncSnapshotSender *pSender); typedef struct SSyncSnapshotReceiver { bool start; int32_t ack; - void *pWriter; + void * pWriter; SyncTerm term; + SyncTerm privateTerm; SSyncNode *pSyncNode; int32_t replicaIndex; @@ -77,8 +78,8 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, int32_t repl void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); -cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); -char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); +cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); +char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 4ed64c43df..ec377d6e92 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -576,6 +576,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs pReply->term = ths->pRaftStore->currentTerm; pReply->success = false; pReply->matchIndex = SYNC_INDEX_INVALID; + pReply->privateTerm = pMsg->privateTerm; SRpcMsg rpcMsg; syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); @@ -660,6 +661,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs pReply->destId = pMsg->srcId; pReply->term = ths->pRaftStore->currentTerm; pReply->success = true; + pReply->privateTerm = pMsg->privateTerm; if (hasAppendEntries) { pReply->matchIndex = pMsg->prevLogIndex + 1; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 841ee15899..1a74b0dd72 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -175,6 +175,8 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ASSERT(pSender != NULL); SyncIndex sentryIndex; +#if 0 + SyncIndex sentryIndex; if (pSender->start && pSender->term == ths->pRaftStore->currentTerm) { // already start sentryIndex = pSender->snapshot.lastApplyIndex; @@ -182,14 +184,26 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ths->pRaftStore->currentTerm); } else { - // start send snapshot, first time - sTrace("sending snapshot start first: pSender->term:%lu, ths->pRaftStore->currentTerm:%lu", pSender->term, - ths->pRaftStore->currentTerm); + if (pMsg->privateTerm == pSender->privateTerm) { + sTrace("same privateTerm, pMsg->privateTerm:%lu, pSender->privateTerm:%lu, do not start snapshot again", + pMsg->privateTerm, pSender->privateTerm); + } else { + // start send snapshot, first time + sTrace( + "sending snapshot start first: pSender->term:%lu, ths->pRaftStore->currentTerm:%lu, " + "pMsg->privateTerm:%lu, pSender->privateTerm:%lu", + pSender->term, ths->pRaftStore->currentTerm, pMsg->privateTerm, pSender->privateTerm); + + snapshotSenderDoStart(pSender); + pSender->start = true; + + // update snapshot private term + syncIndexMgrSetTerm(ths->pNextIndex, &(pMsg->srcId), pSender->privateTerm); + } - snapshotSenderDoStart(pSender); - pSender->start = true; sentryIndex = pSender->snapshot.lastApplyIndex; } +#endif // update nextIndex to sentryIndex + 1 if (nextIndex <= sentryIndex) { diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index eec6adf32f..06906195dc 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -119,7 +119,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) { char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) { cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr); - char *serialized = cJSON_Print(pJson); + char * serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 126229af42..7fb6263612 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -1815,6 +1815,9 @@ cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg) { snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->privateTerm); + cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); + snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastIndex); cJSON_AddStringToObject(pRoot, "lastIndex", u64buf); @@ -1978,6 +1981,9 @@ cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg) { snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term); cJSON_AddStringToObject(pRoot, "term", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->privateTerm); + cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); + snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastIndex); cJSON_AddStringToObject(pRoot, "lastIndex", u64buf); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index ab3614bac3..598260f390 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -177,6 +177,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { pMsg->prevLogIndex = preLogIndex; pMsg->prevLogTerm = preLogTerm; pMsg->commitIndex = pSyncNode->commitIndex; + pMsg->privateTerm = syncIndexMgrGetTerm(pSyncNode->pNextIndex, pDestId); // send msg syncNodeAppendEntries(pSyncNode, pDestId, pMsg); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 36d9225b30..4b60939edf 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -14,6 +14,7 @@ */ #include "syncSnapshot.h" +#include "syncIndexMgr.h" #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncUtil.h" @@ -42,6 +43,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->pSyncNode = pSyncNode; pSender->replicaIndex = replicaIndex; pSender->term = pSyncNode->pRaftStore->currentTerm; + pSender->privateTerm = 0; pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->finish = false; @@ -90,9 +92,13 @@ void snapshotSenderDoStart(SSyncSnapshotSender *pSender) { sTrace("snapshot send begin seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr); taosMemoryFree(msgStr); + // when start, increase term + ++(pSender->privateTerm); + syncSnapshotSendDestroy(pMsg); } +#if 0 // when entry in snapshot, start sender void snapshotSenderStart(SSyncSnapshotSender *pSender) { if (!(pSender->start)) { @@ -142,6 +148,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { sInfo("snapshotSenderStart %s", s); taosMemoryFree(s); } +#endif void snapshotSenderStop(SSyncSnapshotSender *pSender) { if (pSender->pReader != NULL) { @@ -392,6 +399,9 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddNumberToObject(pRoot, "replicaIndex", pReceiver->replicaIndex); snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->term); cJSON_AddStringToObject(pRoot, "term", u64buf); + + snprintf(u64buf, sizeof(u64buf), "%lu", pReceiver->privateTerm); + cJSON_AddStringToObject(pRoot, "privateTerm", u64buf); } cJSON *pJson = cJSON_CreateObject(); @@ -514,6 +524,10 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { pSender->finish = true; snapshotSenderStop(pSender); + + // update nextIndex private term + syncIndexMgrSetTerm(pSyncNode->pNextIndex, &(pMsg->srcId), pSender->privateTerm); + return 0; } diff --git a/source/libs/sync/test/syncIndexMgrTest.cpp b/source/libs/sync/test/syncIndexMgrTest.cpp index 34a6eeb461..e920ae3ac6 100644 --- a/source/libs/sync/test/syncIndexMgrTest.cpp +++ b/source/libs/sync/test/syncIndexMgrTest.cpp @@ -25,7 +25,6 @@ SRaftId ids[TSDB_MAX_REPLICA]; SSyncNode* pSyncNode; SSyncNode* syncNodeInit() { - pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); memset(pSyncNode, 0, sizeof(SSyncNode)); pSyncNode->replicaNum = replicaNum; @@ -81,7 +80,7 @@ int main(int argc, char** argv) { printf("---------------------------------------\n"); for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) { SyncIndex idx = syncIndexMgrGetIndex(pSyncIndexMgr, &ids[i]); - SyncTerm term = syncIndexMgrGetTerm(pSyncIndexMgr, &ids[i]); + SyncTerm term = syncIndexMgrGetTerm(pSyncIndexMgr, &ids[i]); printf("%d: index:%ld term:%lu \n", i, idx, term); } printf("---------------------------------------\n"); diff --git a/source/libs/sync/test/syncSnapshotReceiverTest.cpp b/source/libs/sync/test/syncSnapshotReceiverTest.cpp index a9e87ba12d..69670f09a6 100644 --- a/source/libs/sync/test/syncSnapshotReceiverTest.cpp +++ b/source/libs/sync/test/syncSnapshotReceiverTest.cpp @@ -46,6 +46,7 @@ SSyncSnapshotReceiver* createReceiver() { pReceiver->ack = 20; pReceiver->pWriter = (void*)0x11; pReceiver->term = 66; + pReceiver->privateTerm = 99; return pReceiver; } diff --git a/source/libs/sync/test/syncSnapshotRspTest.cpp b/source/libs/sync/test/syncSnapshotRspTest.cpp index 3c1ce11456..d624d45c94 100644 --- a/source/libs/sync/test/syncSnapshotRspTest.cpp +++ b/source/libs/sync/test/syncSnapshotRspTest.cpp @@ -21,6 +21,7 @@ SyncSnapshotRsp *createMsg() { pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); pMsg->destId.vgId = 100; pMsg->term = 11; + pMsg->privateTerm = 99; pMsg->lastIndex = 22; pMsg->lastTerm = 33; pMsg->ack = 44; diff --git a/source/libs/sync/test/syncSnapshotSendTest.cpp b/source/libs/sync/test/syncSnapshotSendTest.cpp index ca4ac2abeb..01d3264693 100644 --- a/source/libs/sync/test/syncSnapshotSendTest.cpp +++ b/source/libs/sync/test/syncSnapshotSendTest.cpp @@ -21,6 +21,7 @@ SyncSnapshotSend *createMsg() { pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678); pMsg->destId.vgId = 100; pMsg->term = 11; + pMsg->privateTerm = 99; pMsg->lastIndex = 22; pMsg->lastTerm = 33; pMsg->seq = 44; diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index f772b225fe..c23d06ee38 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -308,7 +308,7 @@ int main(int argc, char** argv) { // check parameter assert(replicaNum >= 1 && replicaNum <= 5); - //assert(myIndex >= 0 && myIndex < replicaNum); + // assert(myIndex >= 0 && myIndex < replicaNum); assert(lastApplyIndex >= -1); assert(lastApplyTerm >= 0); assert(writeRecordNum >= 0);