diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index b43eafc918..6635bdbb96 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -80,6 +80,7 @@ typedef struct SFsmCbMeta { uint64_t seqNum; SyncTerm term; SyncTerm currentTerm; + uint64_t flag; } SFsmCbMeta; typedef struct SReConfigCbMeta { diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index c9e16c53c8..4899839b29 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -333,7 +333,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { cbMeta.seqNum = pEntry->seqNum; cbMeta.term = pEntry->term; cbMeta.currentTerm = ths->pRaftStore->currentTerm; - ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); + cbMeta.flag = 9; bool needExecute = true; if (ths->pSnapshot != NULL && cbMeta.index <= ths->pSnapshot->lastApplyIndex) { diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index a3d480956e..d8f693da6e 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -111,6 +111,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { cbMeta.seqNum = pEntry->seqNum; cbMeta.term = pEntry->term; cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; + cbMeta.flag = 7; bool needExecute = true; if (pSyncNode->pSnapshot != NULL && cbMeta.index <= pSyncNode->pSnapshot->lastApplyIndex) { diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 914ce68245..9b7440e48b 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -941,12 +941,13 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { int len = 256; char* s = (char*)taosMemoryMalloc(len); snprintf(s, len, - "syncNode2SimpleStr vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, electTimerLogicClock:%lu, " + "syncNode2SimpleStr vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, " + "electTimerLogicClock:%lu, " "electTimerLogicClockUser:%lu, " "electTimerMS:%d", pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, pSyncNode->state, - syncUtilState2String(pSyncNode->state), pSyncNode->electTimerLogicClock, pSyncNode->electTimerLogicClockUser, - pSyncNode->electTimerMS); + syncUtilState2String(pSyncNode->state), pSyncNode->pRaftCfg->isStandBy, pSyncNode->electTimerLogicClock, + pSyncNode->electTimerLogicClockUser, pSyncNode->electTimerMS); return s; } diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index f52fef0019..fa453577e7 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -43,8 +43,8 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { if (cbMeta.index > beginIndex) { char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + snprintf(logBuf, sizeof(logBuf), "==callback== ==CommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } else { sTrace("==callback== ==CommitCb== do not apply again %ld", cbMeta.index); @@ -54,15 +54,15 @@ void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { void PreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; snprintf(logBuf, sizeof(logBuf), - "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index, - cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + "==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", pFsm, cbMeta.index, + cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } void RollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { char logBuf[256]; - snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", - pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); + snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s flag:%lu\n", + pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), cbMeta.flag); syncRpcMsgLog2(logBuf, (SRpcMsg*)pMsg); } @@ -109,6 +109,7 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* syncInfo.pFsm = createFsm(); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); syncInfo.pWal = pWal; + syncInfo.isStandBy = isStandBy; SSyncCfg* pCfg = &syncInfo.syncCfg; @@ -212,11 +213,15 @@ int main(int argc, char** argv) { int64_t rid = createSyncNode(replicaNum, myIndex, gVgId, pWal, (char*)gDir, isStandBy); assert(rid > 0); - if (isStandBy) { - syncStartStandBy(rid); - } else { - syncStart(rid); - } + syncStart(rid); + + /* + if (isStandBy) { + syncStartStandBy(rid); + } else { + syncStart(rid); + } + */ SSyncNode* pSyncNode = (SSyncNode*)syncNodeAcquire(rid); assert(pSyncNode != NULL);