From 43e2f8bc2bea1cf6a95741d764bcae9786ac1840 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 26 Jul 2022 10:59:56 +0800 Subject: [PATCH] refactor(sync): add pre-commit interface --- source/libs/sync/inc/syncInt.h | 1 + source/libs/sync/src/syncAppendEntries.c | 46 +++++------------------- source/libs/sync/src/syncMain.c | 33 ++--------------- 3 files changed, 12 insertions(+), 68 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 64f66e390a..b802d94bea 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -238,6 +238,7 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncInd bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg); int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag); +int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code); int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 4295abeaa1..7d0f53640c 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -244,22 +244,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); // pre commit - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); - if (ths->pFsm != NULL) { - // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) { - if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { - SFsmCbMeta cbMeta = {0}; - cbMeta.index = pAppendEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); - cbMeta.isWeak = pAppendEntry->isWeak; - cbMeta.code = 2; - cbMeta.state = ths->state; - cbMeta.seqNum = pAppendEntry->seqNum; - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); - } - } - rpcFreeCont(rpcMsg.pCont); + syncNodePreCommit(ths, pAppendEntry, 0); } // free memory @@ -280,22 +265,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); // pre commit - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); - if (ths->pFsm != NULL) { - // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) { - if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { - SFsmCbMeta cbMeta = {0}; - cbMeta.index = pAppendEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); - cbMeta.isWeak = pAppendEntry->isWeak; - cbMeta.code = 3; - cbMeta.state = ths->state; - cbMeta.seqNum = pAppendEntry->seqNum; - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); - } - } - rpcFreeCont(rpcMsg.pCont); + syncNodePreCommit(ths, pAppendEntry, 0); // free memory syncEntryDestory(pAppendEntry); @@ -440,7 +410,7 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) { return code; } -static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { +int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); @@ -456,7 +426,7 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { cbMeta.index = pEntry->index; cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); cbMeta.isWeak = pEntry->isWeak; - cbMeta.code = 2; + cbMeta.code = code; cbMeta.state = ths->state; cbMeta.seqNum = pEntry->seqNum; ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); @@ -594,7 +564,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc return -1; } - code = syncNodePreCommit(ths, pAppendEntry); + code = syncNodePreCommit(ths, pAppendEntry, 0); ASSERT(code == 0); // syncEntryDestory(pAppendEntry); @@ -715,7 +685,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc return -1; } - code = syncNodePreCommit(ths, pAppendEntry); + code = syncNodePreCommit(ths, pAppendEntry, 0); ASSERT(code == 0); // syncEntryDestory(pAppendEntry); @@ -919,7 +889,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs } // pre commit - code = syncNodePreCommit(ths, pAppendEntry); + code = syncNodePreCommit(ths, pAppendEntry, 0); ASSERT(code == 0); // update match index @@ -1032,7 +1002,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs } // pre commit - code = syncNodePreCommit(ths, pAppendEntry); + code = syncNodePreCommit(ths, pAppendEntry, 0); ASSERT(code == 0); syncEntryDestory(pAppendEntry); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index a453b2572c..63a404d5f6 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2504,23 +2504,11 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI } // pre commit + syncNodePreCommit(ths, pEntry, 0); + SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { - SFsmCbMeta cbMeta = {0}; - cbMeta.index = pEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); - cbMeta.isWeak = pEntry->isWeak; - cbMeta.code = 0; - cbMeta.state = ths->state; - cbMeta.seqNum = pEntry->seqNum; - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); - } - } - rpcFreeCont(rpcMsg.pCont); - // if only myself, maybe commit right now if (ths->replicaNum == 1) { syncMaybeAdvanceCommitIndex(ths); @@ -2528,22 +2516,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI } else { // pre commit - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pEntry, &rpcMsg); - - if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { - SFsmCbMeta cbMeta = {0}; - cbMeta.index = pEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); - cbMeta.isWeak = pEntry->isWeak; - cbMeta.code = 1; - cbMeta.state = ths->state; - cbMeta.seqNum = pEntry->seqNum; - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); - } - } - rpcFreeCont(rpcMsg.pCont); + syncNodePreCommit(ths, pEntry, 0); } if (pRetIndex != NULL) {