diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 9b6593e4b5..1eed353f33 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -78,6 +78,8 @@ typedef struct SFsmCbMeta { int32_t code; ESyncState state; uint64_t seqNum; + SyncTerm term; + SyncTerm currentTerm; } SFsmCbMeta; typedef struct SSyncFSM { @@ -85,6 +87,7 @@ typedef struct SSyncFSM { void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); + void (*FpRestoreFinish)(struct SSyncFSM* pFsm); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot); } SSyncFSM; @@ -117,7 +120,6 @@ typedef struct SSyncLogStore { } SSyncLogStore; - typedef struct SSyncInfo { SyncGroupId vgId; SSyncCfg syncCfg; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 68f1d0cd8e..b7cae58b12 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -27,43 +27,37 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSdbRaw *pRaw = pMsg->pCont; - SSnapshot snapshot = {0}; - (*pFsm->FpGetSnapshot)(pFsm, &snapshot); - - if (cbMeta.index > snapshot.lastApplyIndex) { - mTrace("ver:%" PRId64 ", apply raw:%p to sdb, role:%s", cbMeta.index, pRaw, syncStr(cbMeta.state)); - sdbWriteWithoutFree(pSdb, pRaw); - sdbSetApplyIndex(pSdb, cbMeta.index); - if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { - tsem_post(&pMgmt->syncSem); - } - } else { - mTrace("ver:%" PRId64 ", already apply raw:%p to sdb, last:%" PRId64, cbMeta.index, pRaw, snapshot.lastApplyIndex); + mTrace("ver:%" PRId64 ", apply raw:%p to sdb, role:%s", cbMeta.index, pRaw, syncStr(cbMeta.state)); + sdbWriteWithoutFree(pSdb, pRaw); + sdbSetApplyIndex(pSdb, cbMeta.index); + sdbSetApplyTerm(pSdb, cbMeta.term); + if (cbMeta.state == TAOS_SYNC_STATE_LEADER) { + tsem_post(&pMgmt->syncSem); } } -static void mndSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - // strict consistent, do nothing -} - -static void mndSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - // strict consistent, do nothing -} - -static int32_t mndSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { +int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { SMnode *pMnode = pFsm->data; pSnapshot->lastApplyIndex = sdbGetApplyIndex(pMnode->pSdb); pSnapshot->lastApplyTerm = sdbGetApplyTerm(pMnode->pSdb); return 0; } +void mndRestoreFinish(struct SSyncFSM *pFsm) { + SMnode *pMnode = pFsm->data; + mndTransPullup(pMnode); + pMnode->syncMgmt.restored = true; +} + SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM)); pFsm->data = pMnode; pFsm->FpCommitCb = mndSyncCommitMsg; - pFsm->FpPreCommitCb = mndSyncPreCommitMsg; - pFsm->FpRollBackCb = mndSyncRollBackMsg; + pFsm->FpPreCommitCb = NULL; + pFsm->FpRollBackCb = NULL; pFsm->FpGetSnapshot = mndSyncGetSnapshot; + pFsm->FpRestoreFinish = mndRestoreFinish; + pFsm->FpRestoreSnapshot = NULL; return pFsm; } @@ -152,21 +146,8 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { } void mndSyncStart(SMnode *pMnode) { - SSdb *pSdb = pMnode->pSdb; - int64_t lastApplyIndex = sdbGetApplyIndex(pSdb); - syncSetMsgCb(pMnode->syncMgmt.sync, &pMnode->msgCb); syncStart(pMnode->syncMgmt.sync); - - int64_t applyIndex = sdbGetApplyIndex(pSdb); - mndTransPullup(pMnode); - mDebug("pullup trans finished, applyIndex:%" PRId64, applyIndex); - if (applyIndex != lastApplyIndex) { - mInfo("sdb restored from %" PRId64 " to %" PRId64 ", write file", lastApplyIndex, applyIndex); - sdbWriteFile(pSdb); - } - - pMnode->syncMgmt.restored = true; mDebug("sync:%" PRId64 " is started", pMnode->syncMgmt.sync); } diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index f65a317694..b69c087b5f 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -36,8 +36,8 @@ typedef struct SSyncIO { STaosQueue *pMsgQ; STaosQset * pQset; TdThread consumerTid; - void *serverRpc; - void *clientRpc; + void * serverRpc; + void * clientRpc; SEpSet myAddr; SMsgCb msgcb; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 768e1c1cf1..d3345620e9 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -147,6 +147,11 @@ typedef struct SSyncNode { // tools SSyncRespMgr* pSyncRespMgr; + // restore state + bool restoreFinish; + sem_t restoreSem; + SSnapshot* pSnapshot; + } SSyncNode; // open/close -------------- diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 1a5d418e75..ef66d580fb 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -324,7 +324,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - // if (ths->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; @@ -332,7 +331,18 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { cbMeta.code = 0; cbMeta.state = ths->state; cbMeta.seqNum = pEntry->seqNum; + cbMeta.term = pEntry->term; + cbMeta.currentTerm = ths->pRaftStore->currentTerm; ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); + + bool needExecute = true; + if (ths->pSnapshot != NULL && cbMeta.index <= ths->pSnapshot->lastApplyIndex) { + needExecute = false; + } + + if (needExecute) { + ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); + } } // config change @@ -349,6 +359,18 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { } } + // restore finish + if (pEntry->index == ths->pLogStore->getLastIndex(ths->pLogStore)) { + if (ths->restoreFinish == false) { + if (ths->pFsm->FpRestoreFinish != NULL) { + ths->pFsm->FpRestoreFinish(ths->pFsm); + } + ths->restoreFinish = true; + + tsem_post(&ths->restoreSem); + } + } + rpcFreeCont(rpcMsg.pCont); syncEntryDestory(pEntry); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 0f17cf267e..aa4bcb4fae 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -102,7 +102,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - // if (pSyncNode->pFsm->FpCommitCb != NULL && pEntry->originalRpcType != TDMT_VND_SYNC_NOOP) { if (pSyncNode->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta; cbMeta.index = pEntry->index; @@ -110,7 +109,17 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { cbMeta.code = 0; cbMeta.state = pSyncNode->state; cbMeta.seqNum = pEntry->seqNum; - pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta); + cbMeta.term = pEntry->term; + cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; + + bool needExecute = true; + if (pSyncNode->pSnapshot != NULL && cbMeta.index <= pSyncNode->pSnapshot->lastApplyIndex) { + needExecute = false; + } + + if (needExecute) { + pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta); + } } // config change @@ -127,6 +136,18 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { } } + // restore finish + if (pEntry->index == pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore)) { + if (pSyncNode->restoreFinish == false) { + if (pSyncNode->pFsm->FpRestoreFinish != NULL) { + pSyncNode->pFsm->FpRestoreFinish(pSyncNode->pFsm); + } + pSyncNode->restoreFinish = true; + + tsem_post(&pSyncNode->restoreSem); + } + } + rpcFreeCont(rpcMsg.pCont); syncEntryDestory(pEntry); } @@ -162,4 +183,4 @@ bool syncAgree(SSyncNode* pSyncNode, SyncIndex index) { } } return false; -} \ No newline at end of file +} diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index f4f09d0f7b..bffebc1693 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -242,7 +242,7 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { return ret; } -void syncSetMsgCb(int64_t rid, const SMsgCb *msgcb) { +void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) { SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); if (pSyncNode == NULL) { sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid); @@ -492,6 +492,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->pSyncRespMgr = syncRespMgrCreate(NULL, 0); assert(pSyncNode->pSyncRespMgr != NULL); + // restore state + pSyncNode->restoreFinish = false; + pSyncNode->pSnapshot = NULL; + if (pSyncNode->pFsm->FpGetSnapshot != NULL) { + pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot)); + pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot); + } + tsem_init(&(pSyncNode->restoreSem), 0, 0); + // start in syncNodeStart // start raft // syncNodeBecomeFollower(pSyncNode); @@ -511,6 +520,8 @@ void syncNodeStart(SSyncNode* pSyncNode) { // use this now syncNodeAppendNoop(pSyncNode); syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica + + tsem_wait(&pSyncNode->restoreSem); return; } @@ -520,6 +531,8 @@ void syncNodeStart(SSyncNode* pSyncNode) { int32_t ret = 0; // ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); + + tsem_wait(&pSyncNode->restoreSem); } void syncNodeStartStandBy(SSyncNode* pSyncNode) { @@ -556,6 +569,12 @@ void syncNodeClose(SSyncNode* pSyncNode) { taosMemoryFree(pSyncNode->pFsm); } + if (pSyncNode->pSnapshot != NULL) { + taosMemoryFree(pSyncNode->pSnapshot); + } + + tsem_destroy(&pSyncNode->restoreSem); + // free memory in syncFreeNode // taosMemoryFree(pSyncNode); } diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index cff692239a..0850ef6343 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -73,12 +73,17 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; } +void FpRestoreFinishCb(struct SSyncFSM* pFsm) { + sTrace("==callback== ==FpRestoreFinishCb=="); +} + SSyncFSM* createFsm() { SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); pFsm->FpCommitCb = CommitCb; pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpRollBackCb = RollBackCb; pFsm->FpGetSnapshot = GetSnapshotCb; + pFsm->FpRestoreFinish = FpRestoreFinishCb; return pFsm; } diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 62bda5b22e..8ccd698907 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -160,6 +160,8 @@ SyncClientRequest *step1(const SRpcMsg *pMsg) { } int main(int argc, char **argv) { + sprintf(tsTempDir, "%s", "."); + // taosInitLog((char *)"syncTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64;