refactor(sync): add pre-commit interface

This commit is contained in:
Minghao Li 2022-07-26 10:59:56 +08:00
parent 886c4c2feb
commit 43e2f8bc2b
3 changed files with 12 additions and 68 deletions

View File

@ -238,6 +238,7 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncInd
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg); bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag); int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag);
int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code);
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg); int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);

View File

@ -244,22 +244,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit // pre commit
SRpcMsg rpcMsg; syncNodePreCommit(ths, pAppendEntry, 0);
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pAppendEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pAppendEntry->isWeak;
cbMeta.code = 2;
cbMeta.state = ths->state;
cbMeta.seqNum = pAppendEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
} }
// free memory // free memory
@ -280,22 +265,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry);
// pre commit // pre commit
SRpcMsg rpcMsg; syncNodePreCommit(ths, pAppendEntry, 0);
syncEntry2OriginalRpc(pAppendEntry, &rpcMsg);
if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pAppendEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pAppendEntry->isWeak;
cbMeta.code = 3;
cbMeta.state = ths->state;
cbMeta.seqNum = pAppendEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
// free memory // free memory
syncEntryDestory(pAppendEntry); syncEntryDestory(pAppendEntry);
@ -440,7 +410,7 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
return code; return code;
} }
static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
@ -456,7 +426,7 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
cbMeta.index = pEntry->index; cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak; cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 2; cbMeta.code = code;
cbMeta.state = ths->state; cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum; cbMeta.seqNum = pEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
@ -594,7 +564,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
return -1; return -1;
} }
code = syncNodePreCommit(ths, pAppendEntry); code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0); ASSERT(code == 0);
// syncEntryDestory(pAppendEntry); // syncEntryDestory(pAppendEntry);
@ -715,7 +685,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
return -1; return -1;
} }
code = syncNodePreCommit(ths, pAppendEntry); code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0); ASSERT(code == 0);
// syncEntryDestory(pAppendEntry); // syncEntryDestory(pAppendEntry);
@ -919,7 +889,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
} }
// pre commit // pre commit
code = syncNodePreCommit(ths, pAppendEntry); code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0); ASSERT(code == 0);
// update match index // update match index
@ -1032,7 +1002,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
} }
// pre commit // pre commit
code = syncNodePreCommit(ths, pAppendEntry); code = syncNodePreCommit(ths, pAppendEntry, 0);
ASSERT(code == 0); ASSERT(code == 0);
syncEntryDestory(pAppendEntry); syncEntryDestory(pAppendEntry);

View File

@ -2504,23 +2504,11 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
} }
// pre commit // pre commit
syncNodePreCommit(ths, pEntry, 0);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
// if only myself, maybe commit right now // if only myself, maybe commit right now
if (ths->replicaNum == 1) { if (ths->replicaNum == 1) {
syncMaybeAdvanceCommitIndex(ths); syncMaybeAdvanceCommitIndex(ths);
@ -2528,22 +2516,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
} else { } else {
// pre commit // pre commit
SRpcMsg rpcMsg; syncNodePreCommit(ths, pEntry, 0);
syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 1;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
}
rpcFreeCont(rpcMsg.pCont);
} }
if (pRetIndex != NULL) { if (pRetIndex != NULL) {