diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index fc926cb1b4..db9979bf12 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -132,27 +132,27 @@ typedef struct SSnapshotMeta { typedef struct SSyncFSM { void* data; - void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); - void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); - void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); + void (*FpCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta *pMeta); + void (*FpPreCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta); + void (*FpRollBackCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta); - void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); - void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta); - void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); + void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm); + void (*FpReConfigCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SReConfigCbMeta* pMeta); + void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta); - void (*FpBecomeLeaderCb)(struct SSyncFSM* pFsm); - void (*FpBecomeFollowerCb)(struct SSyncFSM* pFsm); + void (*FpBecomeLeaderCb)(const struct SSyncFSM* pFsm); + void (*FpBecomeFollowerCb)(const struct SSyncFSM* pFsm); - int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader); - int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); + int32_t (*FpGetSnapshot)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader); + int32_t (*FpGetSnapshotInfo)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot); - int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader); - int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader); - int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len); + int32_t (*FpSnapshotStartRead)(const struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader); + int32_t (*FpSnapshotStopRead)(const struct SSyncFSM* pFsm, void* pReader); + int32_t (*FpSnapshotDoRead)(const struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len); - int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter); - int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot); - int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len); + int32_t (*FpSnapshotStartWrite)(const struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter); + int32_t (*FpSnapshotStopWrite)(const struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot); + int32_t (*FpSnapshotDoWrite)(const struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len); } SSyncFSM; diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 9cb8a9d564..eb3c99fee7 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -28,8 +28,6 @@ typedef struct SRaftId { SyncGroupId vgId; } SRaftId; -int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); -int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo); void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb); char* sync2SimpleStr(int64_t rid); diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index ad2a3ec447..dc14a28d6f 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -90,7 +90,6 @@ typedef struct { int32_t errCode; int32_t transId; SRWLatch lock; - int8_t leaderTransferFinish; int8_t selfIndex; int8_t numOfReplicas; SReplica replicas[TSDB_MAX_REPLICA]; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 891f2bbcd8..5668802412 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -428,18 +428,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { void mndPreClose(SMnode *pMnode) { if (pMnode != NULL) { - atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0); syncLeaderTransfer(pMnode->syncMgmt.sync); - -#if 0 - mInfo("vgId:1, mnode start leader transfer"); - // wait for leader transfer finish - while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) { - taosMsleep(10); - mInfo("vgId:1, mnode waiting for leader transfer"); - } - mInfo("vgId:1, mnode finish leader transfer"); -#endif } } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index df4b526775..1cd97b73e6 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -39,25 +39,25 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return code; } -void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +void mndSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { SMnode *pMnode = pFsm->data; SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSdbRaw *pRaw = pMsg->pCont; // delete msg handle SRpcMsg rpcMsg = {0}; - syncGetAndDelRespRpc(pMnode->syncMgmt.sync, cbMeta.seqNum, &rpcMsg.info); + rpcMsg.info = pMsg->info; int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw); - pMgmt->errCode = cbMeta.code; + pMgmt->errCode = pMeta->code; mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 " role:%s raw:%p", - transId, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex, syncStr(cbMeta.state), + transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state), pRaw); if (pMgmt->errCode == 0) { sdbWriteWithoutFree(pMnode->pSdb, pRaw); - sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex); + sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); } taosWLockLatch(&pMgmt->lock); @@ -87,7 +87,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM } } -int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) { +int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) { mInfo("start to read snapshot from sdb in atomic way"); SMnode *pMnode = pFsm->data; return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, @@ -95,13 +95,13 @@ int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pR return 0; } -int32_t mndSyncGetSnapshotInfo(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { +int32_t mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) { SMnode *pMnode = pFsm->data; sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex); return 0; } -void mndRestoreFinish(struct SSyncFSM *pFsm) { +void mndRestoreFinish(const SSyncFSM *pFsm) { SMnode *pMnode = pFsm->data; if (!pMnode->deploy) { @@ -113,32 +113,30 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) { } } -void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {} - -int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { +int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) { mInfo("start to read snapshot from sdb"); SMnode *pMnode = pFsm->data; return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL); } -int32_t mndSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { +int32_t mndSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) { mInfo("stop to read snapshot from sdb"); SMnode *pMnode = pFsm->data; return sdbStopRead(pMnode->pSdb, pReader); } -int32_t mndSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { +int32_t mndSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { SMnode *pMnode = pFsm->data; return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len); } -int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) { +int32_t mndSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) { mInfo("start to apply snapshot to sdb"); SMnode *pMnode = pFsm->data; return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter); } -int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { +int32_t mndSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex); SMnode *pMnode = pFsm->data; @@ -146,18 +144,12 @@ int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, pSnapshot->lastConfigIndex); } -int32_t mndSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { +int32_t mndSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { SMnode *pMnode = pFsm->data; return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len); } -void mndLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - SMnode *pMnode = pFsm->data; - atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 1); - mInfo("vgId:1, mnode leader transfer finish"); -} - -static void mndBecomeFollower(struct SSyncFSM *pFsm) { +static void mndBecomeFollower(const SSyncFSM *pFsm) { SMnode *pMnode = pFsm->data; mInfo("vgId:1, become follower"); @@ -172,7 +164,7 @@ static void mndBecomeFollower(struct SSyncFSM *pFsm) { taosWUnLockLatch(&pMnode->syncMgmt.lock); } -static void mndBecomeLeader(struct SSyncFSM *pFsm) { +static void mndBecomeLeader(const SSyncFSM *pFsm) { mInfo("vgId:1, become leader"); SMnode *pMnode = pFsm->data; } @@ -184,8 +176,8 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { pFsm->FpPreCommitCb = NULL; pFsm->FpRollBackCb = NULL; pFsm->FpRestoreFinishCb = mndRestoreFinish; - pFsm->FpLeaderTransferCb = mndLeaderTransfer; - pFsm->FpReConfigCb = mndReConfig; + pFsm->FpLeaderTransferCb = NULL; + pFsm->FpReConfigCb = NULL; pFsm->FpBecomeLeaderCb = mndBecomeLeader; pFsm->FpBecomeFollowerCb = mndBecomeFollower; pFsm->FpGetSnapshot = mndSyncGetSnapshot; @@ -256,32 +248,26 @@ void mndCleanupSync(SMnode *pMnode) { int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { SSyncMgmt *pMgmt = &pMnode->syncMgmt; - SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; - if (req.contLen <= 0) { - terrno = TSDB_CODE_APP_ERROR; - return -1; - } + pMgmt->errCode = 0; + SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; req.pCont = rpcMallocCont(req.contLen); if (req.pCont == NULL) return -1; memcpy(req.pCont, pRaw, req.contLen); - pMgmt->errCode = 0; taosWLockLatch(&pMgmt->lock); if (pMgmt->transId != 0) { - mError("trans:%d, can't be proposed since trans:%d alrady waiting for confirm", transId, pMgmt->transId); + mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); taosWUnLockLatch(&pMgmt->lock); terrno = TSDB_CODE_APP_NOT_READY; return -1; - } else { - pMgmt->transId = transId; - mInfo("trans:%d, will be proposed", pMgmt->transId); - taosWUnLockLatch(&pMgmt->lock); } - const bool isWeak = false; - int32_t code = syncPropose(pMgmt->sync, &req, isWeak); + mInfo("trans:%d, will be proposed", transId); + pMgmt->transId = transId; + taosWUnLockLatch(&pMgmt->lock); + int32_t code = syncPropose(pMgmt->sync, &req, false); if (code == 0) { mInfo("trans:%d, is proposing and wait sem", pMgmt->transId); tsem_wait(&pMgmt->syncSem); @@ -294,8 +280,8 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID); code = 0; } else { - taosWLockLatch(&pMgmt->lock); mInfo("trans:%d, failed to proposed since %s", transId, terrstr()); + taosWLockLatch(&pMgmt->lock); pMgmt->transId = 0; taosWUnLockLatch(&pMgmt->lock); if (terrno == TSDB_CODE_SYN_NOT_LEADER) { @@ -311,7 +297,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { return code; } - if (pMgmt->errCode != 0) terrno = pMgmt->errCode; + terrno = pMgmt->errCode; return pMgmt->errCode; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 7f2214d298..1d88be42d8 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -237,7 +237,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int32_t code = syncProcessMsg(pVnode->sync, pMsg); if (code != 0) { vGError("vgId:%d, failed to process sync msg:%p type:%s since %s", pVnode->config.vgId, pMsg, - TMSG_INFO(pMsg->msgType), terrstr()); + TMSG_INFO(pMsg->msgType), terrstr()); } return code; @@ -278,78 +278,61 @@ static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return code; } -static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { +static int32_t vnodeSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot) { vnodeGetSnapshot(pFsm->data, pSnapshot); return 0; } -static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {} +static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { + SVnode *pVnode = pFsm->data; -static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - if (cbMeta.isWeak == 0) { - SVnode *pVnode = pFsm->data; + if (pMeta->code == 0) { + SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); + rpcMsg.info = pMsg->info; + rpcMsg.info.conn.applyIndex = pMeta->index; + rpcMsg.info.conn.applyTerm = pMeta->term; - if (cbMeta.code == 0) { - SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; - rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); - syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); - rpcMsg.info.conn.applyIndex = cbMeta.index; - rpcMsg.info.conn.applyTerm = cbMeta.term; - - vInfo("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 + const STraceId *trace = &pMsg->info.traceId; + vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", - syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.term, rpcMsg.info.conn.applyIndex, cbMeta.isWeak, - cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType)); + syncGetVgId(pVnode->sync), pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak, + pMeta->code, pMeta->state, syncUtilState2String(pMeta->state), TMSG_INFO(pMsg->msgType)); - tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); - } else { - SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info}; - vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", syncGetVgId(pVnode->sync), - TMSG_INFO(pMsg->msgType), cbMeta.index, cbMeta.code, tstrerror(cbMeta.code)); - if (rsp.info.handle != NULL) { - tmsgSendRsp(&rsp); - } + tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); + } else { + SRpcMsg rsp = {.code = pMeta->code, .info = pMsg->info}; + vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", syncGetVgId(pVnode->sync), + TMSG_INFO(pMsg->msgType), pMeta->index, pMeta->code, tstrerror(pMeta->code)); + if (rsp.info.handle != NULL) { + tmsgSendRsp(&rsp); } } } -static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { - if (cbMeta.isWeak == 1) { - SVnode *pVnode = pFsm->data; - vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", - syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, - syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType)); - - if (cbMeta.code == 0) { - SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; - rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); - syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); - rpcMsg.info.conn.applyIndex = cbMeta.index; - rpcMsg.info.conn.applyTerm = cbMeta.term; - tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); - } else { - SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info}; - vError("vgId:%d, pre-commit-cb execute error, type:%s, error:0x%x %s", syncGetVgId(pVnode->sync), - TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code)); - if (rsp.info.handle != NULL) { - tmsgSendRsp(&rsp); - } - } +static void vnodeSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { + if (pMeta->isWeak == 0) { + vnodeSyncApplyMsg(pFsm->data, pMsg, pMeta); } } -static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { +static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { + if (pMeta->isWeak == 1) { + vnodeSyncApplyMsg(pFsm->data, pMsg, pMeta); + } +} + +static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { SVnode *pVnode = pFsm->data; vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", - syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, - syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType)); + syncGetVgId(pVnode->sync), pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, + syncUtilState2String(pMeta->state), TMSG_INFO(pMsg->msgType)); } #define USE_TSDB_SNAPSHOT -static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { +static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) { #ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; SSnapshotParam *pSnapshotParam = pParam; @@ -361,7 +344,7 @@ static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void #endif } -static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { +static int32_t vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) { #ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; int32_t code = vnodeSnapReaderClose(pReader); @@ -372,7 +355,7 @@ static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { #endif } -static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { +static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { #ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len); @@ -391,7 +374,7 @@ static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void ** #endif } -static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) { +static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) { #ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; SSnapshotParam *pSnapshotParam = pParam; @@ -415,7 +398,7 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void #endif } -static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { +static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { #ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, @@ -430,7 +413,7 @@ static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool #endif } -static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { +static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { #ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len); @@ -442,9 +425,7 @@ static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void * #endif } -static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {} - -static void vnodeRestoreFinish(struct SSyncFSM *pFsm) { +static void vnodeRestoreFinish(const SSyncFSM *pFsm) { SVnode *pVnode = pFsm->data; do { @@ -464,7 +445,7 @@ static void vnodeRestoreFinish(struct SSyncFSM *pFsm) { vDebug("vgId:%d, sync restore finished", pVnode->config.vgId); } -static void vnodeBecomeFollower(struct SSyncFSM *pFsm) { +static void vnodeBecomeFollower(const SSyncFSM *pFsm) { SVnode *pVnode = pFsm->data; vDebug("vgId:%d, become follower", pVnode->config.vgId); @@ -478,7 +459,7 @@ static void vnodeBecomeFollower(struct SSyncFSM *pFsm) { taosThreadMutexUnlock(&pVnode->lock); } -static void vnodeBecomeLeader(struct SSyncFSM *pFsm) { +static void vnodeBecomeLeader(const SSyncFSM *pFsm) { SVnode *pVnode = pFsm->data; vDebug("vgId:%d, become leader", pVnode->config.vgId); @@ -500,10 +481,10 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot; pFsm->FpRestoreFinishCb = vnodeRestoreFinish; - pFsm->FpLeaderTransferCb = vnodeLeaderTransfer; + pFsm->FpLeaderTransferCb = NULL; pFsm->FpBecomeLeaderCb = vnodeBecomeLeader; pFsm->FpBecomeFollowerCb = vnodeBecomeFollower; - pFsm->FpReConfigCb = vnodeSyncReconfig; + pFsm->FpReConfigCb = NULL; pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead; pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead; pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 69c8feb256..b3edc2ca73 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -737,30 +737,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { syncNodeRelease(pSyncNode); } -int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - return TAOS_SYNC_STATE_ERROR; - } - ASSERT(rid == pSyncNode->rid); - - SRespStub stub; - int32_t ret = syncRespMgrGet(pSyncNode->pSyncRespMgr, index, &stub); - if (ret == 1) { - memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg)); - } - - syncNodeRelease(pSyncNode); - return ret; -} - -int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) { - SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) { - return TAOS_SYNC_STATE_ERROR; - } - ASSERT(rid == pSyncNode->rid); - +static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandleInfo* pInfo) { SRespStub stub; int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub); if (ret == 1) { @@ -768,8 +745,6 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) } sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle); - syncNodeRelease(pSyncNode); - return ret; } void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) { @@ -3127,17 +3102,18 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p } if (ths->pFsm->FpLeaderTransferCb != NULL) { - SFsmCbMeta cbMeta = {0}; - cbMeta.code = 0; - cbMeta.currentTerm = ths->pRaftStore->currentTerm; - cbMeta.flag = 0; - cbMeta.index = pEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); - cbMeta.isWeak = pEntry->isWeak; - cbMeta.seqNum = pEntry->seqNum; - cbMeta.state = ths->state; - cbMeta.term = pEntry->term; - ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, cbMeta); + SFsmCbMeta cbMeta = { + cbMeta.code = 0, + cbMeta.currentTerm = ths->pRaftStore->currentTerm, + cbMeta.flag = 0, + cbMeta.index = pEntry->index, + cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index), + cbMeta.isWeak = pEntry->isWeak, + cbMeta.seqNum = pEntry->seqNum, + cbMeta.state = ths->state, + cbMeta.term = pEntry->term, + }; + ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta); } syncLeaderTransferDestroy(pSyncLeaderTransfer); @@ -3314,18 +3290,20 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde // execute fsm in apply thread, or execute outside syncPropose if (internalExecute) { - SFsmCbMeta cbMeta = {0}; - cbMeta.index = pEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); - cbMeta.isWeak = pEntry->isWeak; - cbMeta.code = 0; - cbMeta.state = ths->state; - cbMeta.seqNum = pEntry->seqNum; - cbMeta.term = pEntry->term; - cbMeta.currentTerm = ths->pRaftStore->currentTerm; - cbMeta.flag = flag; + SFsmCbMeta cbMeta = { + .index = pEntry->index, + .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index), + .isWeak = pEntry->isWeak, + .code = 0, + .state = ths->state, + .seqNum = pEntry->seqNum, + .term = pEntry->term, + .currentTerm = ths->pRaftStore->currentTerm, + .flag = flag, + }; - ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); + syncGetAndDelRespRpc(ths, cbMeta.seqNum, &rpcMsg.info); + ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta); } }