From 97b1e95ad8ea575d047d87a61888c3f9f1880332 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 23 May 2022 12:14:53 +0800 Subject: [PATCH 1/4] enh(sync) sync/mnode integration, add term, currentTerm in cbMeta --- include/libs/sync/sync.h | 2 ++ source/dnode/mnode/impl/src/mndSync.c | 32 ++++++++++++++++++------ source/libs/sync/src/syncAppendEntries.c | 2 ++ source/libs/sync/src/syncCommit.c | 4 ++- 4 files changed, 32 insertions(+), 8 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 9b6593e4b5..872785abe5 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -78,6 +78,8 @@ typedef struct SFsmCbMeta { int32_t code; ESyncState state; uint64_t seqNum; + SyncTerm term; + SyncTerm currentTerm; } SFsmCbMeta; typedef struct SSyncFSM { diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 0b7ed04819..5eec46056b 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -139,14 +139,32 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM SMnode *pMnode = pFsm->data; SSyncMgmt *pMgmt = &pMnode->syncMgmt; - SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg; - pApplyMsg->info.node = pFsm->data; - mndProcessApplyMsg(pApplyMsg); - sdbUpdateVer(pMnode->pSdb, 1); + if (cbMeta.term < cbMeta.currentTerm) { + // restoring + + SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg; + pApplyMsg->info.node = pFsm->data; + mndProcessApplyMsg(pApplyMsg); + //sdbUpdateVer(pMnode->pSdb, 1); ==> sdbSetVer(cbMeta.index, cbMeta.term); + + // mndTransPullup(pMnode); + + } else if (cbMeta.term == cbMeta.currentTerm) { + // restore finish + + SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg; + pApplyMsg->info.node = pFsm->data; + mndProcessApplyMsg(pApplyMsg); + + //sdbUpdateVer(pMnode->pSdb, 1); ==> sdbSetVer(cbMeta.index, cbMeta.term); + + if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { + tsem_post(&pMgmt->syncSem); + } + } else { + ASSERT(0); + } - if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { - tsem_post(&pMgmt->syncSem); - } } } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 1a5d418e75..bf1eeb4186 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -332,6 +332,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { cbMeta.code = 0; cbMeta.state = ths->state; cbMeta.seqNum = pEntry->seqNum; + cbMeta.term = pEntry->term; + cbMeta.currentTerm = ths->pRaftStore->currentTerm; ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 0f17cf267e..01e408e543 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -110,6 +110,8 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { cbMeta.code = 0; cbMeta.state = pSyncNode->state; cbMeta.seqNum = pEntry->seqNum; + cbMeta.term = pEntry->term; + cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta); } @@ -162,4 +164,4 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { } } return false; -} \ No newline at end of file +} From 4535722957103e94d3bbd595047af4bc92079238 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 23 May 2022 15:41:04 +0800 Subject: [PATCH 2/4] enh(sync) sync/mnode integration, syncStart async -> sync --- include/libs/sync/sync.h | 2 +- source/libs/sync/inc/syncIO.h | 4 ++-- source/libs/sync/inc/syncInt.h | 5 +++++ source/libs/sync/src/syncAppendEntries.c | 20 ++++++++++++++++- source/libs/sync/src/syncCommit.c | 25 ++++++++++++++++++---- source/libs/sync/src/syncMain.c | 21 +++++++++++++++++- source/libs/sync/test/syncSnapshotTest.cpp | 2 ++ 7 files changed, 70 insertions(+), 9 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 872785abe5..1eed353f33 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -87,6 +87,7 @@ typedef struct SSyncFSM { void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); + void (*FpRestoreFinish)(struct SSyncFSM* pFsm); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot); } SSyncFSM; @@ -119,7 +120,6 @@ typedef struct SSyncLogStore { } SSyncLogStore; - typedef struct SSyncInfo { SyncGroupId vgId; SSyncCfg syncCfg; diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index f65a317694..b69c087b5f 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -36,8 +36,8 @@ typedef struct SSyncIO { STaosQueue *pMsgQ; STaosQset * pQset; TdThread consumerTid; - void *serverRpc; - void *clientRpc; + void * serverRpc; + void * clientRpc; SEpSet myAddr; SMsgCb msgcb; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 768e1c1cf1..d3345620e9 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -147,6 +147,11 @@ typedef struct SSyncNode { // tools SSyncRespMgr* pSyncRespMgr; + // restore state + bool restoreFinish; + sem_t restoreSem; + SSnapshot* pSnapshot; + } SSyncNode; // open/close -------------- diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index bf1eeb4186..18735f5b71 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -324,7 +324,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - // if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; @@ -335,6 +334,15 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { cbMeta.term = pEntry->term; cbMeta.currentTerm = ths->pRaftStore->currentTerm; ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); + + bool needExecute = true; + if (ths->pSnapshot != NULL && cbMeta.index <= ths->pSnapshot->lastApplyIndex) { + needExecute = false; + } + + if (needExecute) { + ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); + } } // config change @@ -351,6 +359,16 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { } } + // restore finish + if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) { + if (ths->restoreFinish == false) { + ths->pFsm->FpRestoreFinish(ths->pFsm); + ths->restoreFinish = true; + + tsem_post(&ths->restoreSem); + } + } + rpcFreeCont(rpcMsg.pCont); syncEntryDestory(pEntry); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 01e408e543..557f69a2ce 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -102,7 +102,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - // if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; @@ -110,9 +109,17 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { cbMeta.code = 0; cbMeta.state = pSyncNode->state; cbMeta.seqNum = pEntry->seqNum; - cbMeta.term = pEntry->term; - cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; - pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta); + cbMeta.term = pEntry->term; + cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; + + bool needExecute = true; + if (pSyncNode->pSnapshot != NULL && cbMeta.index <= pSyncNode->pSnapshot->lastApplyIndex) { + needExecute = false; + } + + if (needExecute) { + pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta); + } } // config change @@ -129,6 +136,16 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { } } + // restore finish + if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) { + if (pSyncNode->restoreFinish == false) { + pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm); + pSyncNode->restoreFinish = true; + + tsem_post(&pSyncNode->restoreSem); + } + } + rpcFreeCont(rpcMsg.pCont); syncEntryDestory(pEntry); } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index f4f09d0f7b..bffebc1693 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -242,7 +242,7 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { return ret; } -void syncSetMsgCb(int64_t rid, const SMsgCb *msgcb) { +void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid); @@ -492,6 +492,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->pSyncRespMgr = syncRespMgrCreate(NULL, 0); assert(pSyncNode->pSyncRespMgr != NULL); + // restore state + pSyncNode->restoreFinish = false; + pSyncNode->pSnapshot = NULL; + if (pSyncNode->pFsm->FpGetSnapshot != NULL) { + pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot)); + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot); + } + tsem_init(&(pSyncNode->restoreSem), 0, 0); + // start in syncNodeStart // start raft // syncNodeBecomeFollower(pSyncNode); @@ -511,6 +520,8 @@ void syncNodeStart(SSyncNode* pSyncNode) { // use this now syncNodeAppendNoop(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica + + tsem_wait(&pSyncNode->restoreSem); return; } @@ -520,6 +531,8 @@ void syncNodeStart(SSyncNode* pSyncNode) { int32_t ret = 0; // ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); + + tsem_wait(&pSyncNode->restoreSem); } void syncNodeStartStandBy(SSyncNode* pSyncNode) { @@ -556,6 +569,12 @@ void syncNodeClose(SSyncNode* pSyncNode) { taosMemoryFree(pSyncNode->pFsm); } + if (pSyncNode->pSnapshot != NULL) { + taosMemoryFree(pSyncNode->pSnapshot); + } + + tsem_destroy(&pSyncNode->restoreSem); + // free memory in syncFreeNode // taosMemoryFree(pSyncNode); } diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 62bda5b22e..8ccd698907 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -160,6 +160,8 @@ SyncClientRequest *step1(const SRpcMsg *pMsg) { } int main(int argc, char **argv) { + sprintf(tsTempDir, "%s", "."); + // taosInitLog((char *)"syncTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; From dc82c1ba226f74805edbd519b959b4254b805572 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 23 May 2022 16:00:03 +0800 Subject: [PATCH 3/4] enh(sync) sync/mnode integration, syncStart async -> sync --- source/dnode/mnode/impl/src/mndSync.c | 39 ++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 5eec46056b..16c6b7f7fa 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -128,6 +128,18 @@ int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); } void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { + SRpcMsg *pApplyMsg = (SRpcMsg *)pMsg; + pApplyMsg->info.node = pFsm->data; + mndProcessApplyMsg(pApplyMsg); + //sdbUpdateVer(pMnode->pSdb, 1); ==> sdbSetVer(cbMeta.index, cbMeta.term); + + if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { + SMnode *pMnode = pFsm->data; + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + tsem_post(&pMgmt->syncSem); + } + +#if 0 SyncIndex beginIndex = SYNC_INDEX_INVALID; if (pFsm->FpGetSnapshot != NULL) { SSnapshot snapshot = {0}; @@ -166,8 +178,23 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM } } +#endif + } +int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { + SMnode *pMnode = pFsm->data; + pSnapshot->lastApplyIndex = pMnode->pSdb->lastCommitVer; + //pSnapshot->lastApplyTerm = ...; + return 0; +} + +void mndRestoreFinish(struct SSyncFSM* pFsm) { + SMnode *pMnode = pFsm->data; + mndTransPullup(pMnode); +} + +#if 0 void mndSyncPreCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { // strict consistent, do nothing } @@ -175,19 +202,17 @@ void mndSyncPreCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta void mndSyncRollBackMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { // strict consistent, do nothing } - -int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { - // snapshot - return 0; -} +#endif SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pMnode; pFsm->FpCommitCb = mndSyncCommitMsg; - pFsm->FpPreCommitCb = mndSyncPreCommitMsg; - pFsm->FpRollBackCb = mndSyncRollBackMsg; + pFsm->FpPreCommitCb = NULL; + pFsm->FpRollBackCb = NULL; pFsm->FpGetSnapshot = mndSyncGetSnapshot; + pFsm->FpRestoreFinish = mndRestoreFinish; + pFsm->FpRestoreSnapshot = NULL; return pFsm; } From ee4b694a3540c9f185c5e349625e6d8a4ca73cd6 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 23 May 2022 18:10:04 +0800 Subject: [PATCH 4/4] enh(sync) sync/mnode integration, syncStart async -> sync --- source/libs/sync/src/syncAppendEntries.c | 4 +++- source/libs/sync/src/syncCommit.c | 4 +++- source/libs/sync/test/syncConfigChangeTest.cpp | 5 +++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 18735f5b71..ef66d580fb 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -362,7 +362,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // restore finish if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) { if (ths->restoreFinish == false) { - ths->pFsm->FpRestoreFinish(ths->pFsm); + if (ths->pFsm->FpRestoreFinish != NULL) { + ths->pFsm->FpRestoreFinish(ths->pFsm); + } ths->restoreFinish = true; tsem_post(&ths->restoreSem); diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 557f69a2ce..aa4bcb4fae 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -139,7 +139,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { // restore finish if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) { if (pSyncNode->restoreFinish == false) { - pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm); + if (pSyncNode->pFsm->FpRestoreFinish != NULL) { + pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm); + } pSyncNode->restoreFinish = true; tsem_post(&pSyncNode->restoreSem); diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index cff692239a..0850ef6343 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -73,12 +73,17 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } +void FpRestoreFinishCb(struct SSyncFSM* pFsm) { + sTrace("==callback== ==FpRestoreFinishCb=="); +} + SSyncFSM* createFsm() { SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); pFsm->FpCommitCb = CommitCb; pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpRollBackCb = RollBackCb; pFsm->FpGetSnapshot = GetSnapshotCb; + pFsm->FpRestoreFinish = FpRestoreFinishCb; return pFsm; }