refactor(sync): add last config index in fsm cbMeta
This commit is contained in:
parent
455a8da0ad
commit
9ab28e0b88
|
@ -62,6 +62,7 @@ typedef struct SSyncCfg {
|
||||||
|
|
||||||
typedef struct SFsmCbMeta {
|
typedef struct SFsmCbMeta {
|
||||||
SyncIndex index;
|
SyncIndex index;
|
||||||
|
SyncIndex lastConfigIndex;
|
||||||
bool isWeak;
|
bool isWeak;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
ESyncState state;
|
ESyncState state;
|
||||||
|
@ -75,6 +76,7 @@ typedef struct SReConfigCbMeta {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
SyncIndex index;
|
SyncIndex index;
|
||||||
SyncTerm term;
|
SyncTerm term;
|
||||||
|
SyncIndex lastConfigIndex;
|
||||||
SyncTerm currentTerm;
|
SyncTerm currentTerm;
|
||||||
SSyncCfg oldCfg;
|
SSyncCfg oldCfg;
|
||||||
SSyncCfg newCfg;
|
SSyncCfg newCfg;
|
||||||
|
|
|
@ -171,7 +171,8 @@ void syncNodeClose(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak);
|
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak);
|
||||||
|
|
||||||
// option
|
// option
|
||||||
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
||||||
|
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex);
|
||||||
|
|
||||||
// ping --------------
|
// ping --------------
|
||||||
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
|
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
|
||||||
|
|
|
@ -208,8 +208,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
|
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
|
||||||
|
|
||||||
SFsmCbMeta cbMeta;
|
SFsmCbMeta cbMeta = {0};
|
||||||
cbMeta.index = pRollBackEntry->index;
|
cbMeta.index = pRollBackEntry->index;
|
||||||
|
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
||||||
cbMeta.isWeak = pRollBackEntry->isWeak;
|
cbMeta.isWeak = pRollBackEntry->isWeak;
|
||||||
cbMeta.code = 0;
|
cbMeta.code = 0;
|
||||||
cbMeta.state = ths->state;
|
cbMeta.state = ths->state;
|
||||||
|
@ -234,8 +235,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
if (ths->pFsm != NULL) {
|
if (ths->pFsm != NULL) {
|
||||||
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
|
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
|
||||||
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
|
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
|
||||||
SFsmCbMeta cbMeta;
|
SFsmCbMeta cbMeta = {0};
|
||||||
cbMeta.index = pAppendEntry->index;
|
cbMeta.index = pAppendEntry->index;
|
||||||
|
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
||||||
cbMeta.isWeak = pAppendEntry->isWeak;
|
cbMeta.isWeak = pAppendEntry->isWeak;
|
||||||
cbMeta.code = 2;
|
cbMeta.code = 2;
|
||||||
cbMeta.state = ths->state;
|
cbMeta.state = ths->state;
|
||||||
|
@ -266,8 +268,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
if (ths->pFsm != NULL) {
|
if (ths->pFsm != NULL) {
|
||||||
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
|
// if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) {
|
||||||
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
|
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) {
|
||||||
SFsmCbMeta cbMeta;
|
SFsmCbMeta cbMeta = {0};
|
||||||
cbMeta.index = pAppendEntry->index;
|
cbMeta.index = pAppendEntry->index;
|
||||||
|
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
||||||
cbMeta.isWeak = pAppendEntry->isWeak;
|
cbMeta.isWeak = pAppendEntry->isWeak;
|
||||||
cbMeta.code = 3;
|
cbMeta.code = 3;
|
||||||
cbMeta.state = ths->state;
|
cbMeta.state = ths->state;
|
||||||
|
@ -696,8 +699,9 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
|
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
|
||||||
|
|
||||||
SFsmCbMeta cbMeta;
|
SFsmCbMeta cbMeta = {0};
|
||||||
cbMeta.index = pRollBackEntry->index;
|
cbMeta.index = pRollBackEntry->index;
|
||||||
|
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
||||||
cbMeta.isWeak = pRollBackEntry->isWeak;
|
cbMeta.isWeak = pRollBackEntry->isWeak;
|
||||||
cbMeta.code = 0;
|
cbMeta.code = 0;
|
||||||
cbMeta.state = ths->state;
|
cbMeta.state = ths->state;
|
||||||
|
@ -725,8 +729,9 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
|
||||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||||
if (ths->pFsm != NULL) {
|
if (ths->pFsm != NULL) {
|
||||||
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
|
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
|
||||||
SFsmCbMeta cbMeta;
|
SFsmCbMeta cbMeta = {0};
|
||||||
cbMeta.index = pEntry->index;
|
cbMeta.index = pEntry->index;
|
||||||
|
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
||||||
cbMeta.isWeak = pEntry->isWeak;
|
cbMeta.isWeak = pEntry->isWeak;
|
||||||
cbMeta.code = 2;
|
cbMeta.code = 2;
|
||||||
cbMeta.state = ths->state;
|
cbMeta.state = ths->state;
|
||||||
|
|
|
@ -441,6 +441,21 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) {
|
||||||
|
ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1);
|
||||||
|
SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0];
|
||||||
|
|
||||||
|
for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) {
|
||||||
|
if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex &&
|
||||||
|
(pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotLastApplyIndex) {
|
||||||
|
lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
sTrace("sync syncNodeGetSnapshotConfigIndex index:%ld lastConfigIndex:%ld", snapshotLastApplyIndex, lastIndex);
|
||||||
|
return lastIndex;
|
||||||
|
}
|
||||||
|
|
||||||
const char* syncGetMyRoleStr(int64_t rid) {
|
const char* syncGetMyRoleStr(int64_t rid) {
|
||||||
const char* s = syncUtilState2String(syncGetMyRole(rid));
|
const char* s = syncUtilState2String(syncGetMyRole(rid));
|
||||||
return s;
|
return s;
|
||||||
|
@ -2065,8 +2080,9 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
|
||||||
if (ths->pFsm != NULL) {
|
if (ths->pFsm != NULL) {
|
||||||
// if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) {
|
// if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) {
|
||||||
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
|
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
|
||||||
SFsmCbMeta cbMeta;
|
SFsmCbMeta cbMeta = {0};
|
||||||
cbMeta.index = pEntry->index;
|
cbMeta.index = pEntry->index;
|
||||||
|
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
||||||
cbMeta.isWeak = pEntry->isWeak;
|
cbMeta.isWeak = pEntry->isWeak;
|
||||||
cbMeta.code = 0;
|
cbMeta.code = 0;
|
||||||
cbMeta.state = ths->state;
|
cbMeta.state = ths->state;
|
||||||
|
@ -2087,8 +2103,9 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
|
||||||
if (ths->pFsm != NULL) {
|
if (ths->pFsm != NULL) {
|
||||||
// if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) {
|
// if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) {
|
||||||
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
|
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
|
||||||
SFsmCbMeta cbMeta;
|
SFsmCbMeta cbMeta = {0};
|
||||||
cbMeta.index = pEntry->index;
|
cbMeta.index = pEntry->index;
|
||||||
|
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
||||||
cbMeta.isWeak = pEntry->isWeak;
|
cbMeta.isWeak = pEntry->isWeak;
|
||||||
cbMeta.code = 1;
|
cbMeta.code = 1;
|
||||||
cbMeta.state = ths->state;
|
cbMeta.state = ths->state;
|
||||||
|
@ -2152,11 +2169,12 @@ static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
if (ths->pFsm->FpLeaderTransferCb != NULL) {
|
if (ths->pFsm->FpLeaderTransferCb != NULL) {
|
||||||
SFsmCbMeta cbMeta;
|
SFsmCbMeta cbMeta = {0};
|
||||||
cbMeta.code = 0;
|
cbMeta.code = 0;
|
||||||
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
|
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
|
||||||
cbMeta.flag = 0;
|
cbMeta.flag = 0;
|
||||||
cbMeta.index = pEntry->index;
|
cbMeta.index = pEntry->index;
|
||||||
|
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
||||||
cbMeta.isWeak = pEntry->isWeak;
|
cbMeta.isWeak = pEntry->isWeak;
|
||||||
cbMeta.seqNum = pEntry->seqNum;
|
cbMeta.seqNum = pEntry->seqNum;
|
||||||
cbMeta.state = ths->state;
|
cbMeta.state = ths->state;
|
||||||
|
@ -2258,6 +2276,7 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
|
||||||
cbMeta.code = 0;
|
cbMeta.code = 0;
|
||||||
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
|
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
|
||||||
cbMeta.index = pEntry->index;
|
cbMeta.index = pEntry->index;
|
||||||
|
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index);
|
||||||
cbMeta.term = pEntry->term;
|
cbMeta.term = pEntry->term;
|
||||||
cbMeta.newCfg = newSyncCfg;
|
cbMeta.newCfg = newSyncCfg;
|
||||||
cbMeta.oldCfg = oldSyncCfg;
|
cbMeta.oldCfg = oldSyncCfg;
|
||||||
|
@ -2292,8 +2311,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
|
||||||
|
|
||||||
// user commit
|
// user commit
|
||||||
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
|
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) {
|
||||||
SFsmCbMeta cbMeta;
|
SFsmCbMeta cbMeta = {0};
|
||||||
cbMeta.index = pEntry->index;
|
cbMeta.index = pEntry->index;
|
||||||
|
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
||||||
cbMeta.isWeak = pEntry->isWeak;
|
cbMeta.isWeak = pEntry->isWeak;
|
||||||
cbMeta.code = 0;
|
cbMeta.code = 0;
|
||||||
cbMeta.state = ths->state;
|
cbMeta.state = ths->state;
|
||||||
|
|
Loading…
Reference in New Issue