fix(sync), call FpCommitCb twice when FOLLOWER

This commit is contained in:
Minghao Li 2022-05-26 11:02:35 +08:00
parent e0d3b53be7
commit 9a2eddda4c
5 changed files with 23 additions and 15 deletions

View File

@ -80,6 +80,7 @@ typedef struct SFsmCbMeta {
uint64_t seqNum;
SyncTerm term;
SyncTerm currentTerm;
uint64_t flag;
} SFsmCbMeta;
typedef struct SReConfigCbMeta {

View File

@ -333,7 +333,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
cbMeta.flag = 9;
bool needExecute = true;
if (ths->pSnapshot != NULL && cbMeta.index <= ths->pSnapshot->lastApplyIndex) {

View File

@ -111,6 +111,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
cbMeta.flag = 7;
bool needExecute = true;
if (pSyncNode->pSnapshot != NULL && cbMeta.index <= pSyncNode->pSnapshot->lastApplyIndex) {

View File

@ -941,12 +941,13 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
int len = 256;
char* s = (char*)taosMemoryMalloc(len);
snprintf(s, len,
"syncNode2SimpleStr vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, electTimerLogicClock:%lu, "
"syncNode2SimpleStr vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, "
"electTimerLogicClock:%lu, "
"electTimerLogicClockUser:%lu, "
"electTimerMS:%d",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, pSyncNode->state,
syncUtilState2String(pSyncNode->state), pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser,
pSyncNode->electTimerMS);
syncUtilState2String(pSyncNode->state), pSyncNode->pRaftCfg->isStandBy, pSyncNode->electTimerLogicClock,
pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS);
return s;
}

View File

@ -43,8 +43,8 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
if (cbMeta.index > beginIndex) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
} else {
sTrace("==callback== ==CommitCb== do not apply again %ld", cbMeta.index);
@ -54,15 +54,15 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", pFsm, cbMeta.index,
cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
}
void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag);
syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg);
}
@ -109,6 +109,7 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
syncInfo.pFsm = createFsm();
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
syncInfo.pWal = pWal;
syncInfo.isStandBy = isStandBy;
SSyncCfg* pCfg = &syncInfo.syncCfg;
@ -212,11 +213,15 @@ int main(int argc, char** argv) {
int64_t rid = createSyncNode(replicaNum, myIndex, gVgId, pWal, (char*)gDir, isStandBy);
assert(rid > 0);
if (isStandBy) {
syncStartStandBy(rid);
} else {
syncStart(rid);
}
syncStart(rid);
/*
if (isStandBy) {
syncStartStandBy(rid);
} else {
syncStart(rid);
}
*/
SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid);
assert(pSyncNode != NULL);