enh: refact sync callback func

This commit is contained in:
Shengliang Guan 2022-11-01 15:15:58 +08:00
parent e79e50ae36
commit f4dc7d0766
7 changed files with 114 additions and 183 deletions

View File

@ -132,27 +132,27 @@ typedef struct SSnapshotMeta {
typedef struct SSyncFSM { typedef struct SSyncFSM {
void* data; void* data;
void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta *pMeta);
void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpPreCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpRollBackCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm);
void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta); void (*FpReConfigCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SReConfigCbMeta* pMeta);
void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
void (*FpBecomeLeaderCb)(struct SSyncFSM* pFsm); void (*FpBecomeLeaderCb)(const struct SSyncFSM* pFsm);
void (*FpBecomeFollowerCb)(struct SSyncFSM* pFsm); void (*FpBecomeFollowerCb)(const struct SSyncFSM* pFsm);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader); int32_t (*FpGetSnapshot)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader);
int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpGetSnapshotInfo)(const struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader); int32_t (*FpSnapshotStartRead)(const struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader);
int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader); int32_t (*FpSnapshotStopRead)(const struct SSyncFSM* pFsm, void* pReader);
int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len); int32_t (*FpSnapshotDoRead)(const struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len);
int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter); int32_t (*FpSnapshotStartWrite)(const struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter);
int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot); int32_t (*FpSnapshotStopWrite)(const struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot);
int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len); int32_t (*FpSnapshotDoWrite)(const struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len);
} SSyncFSM; } SSyncFSM;

View File

@ -28,8 +28,6 @@ typedef struct SRaftId {
SyncGroupId vgId; SyncGroupId vgId;
} SRaftId; } SRaftId;
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg);
int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo);
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb); void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb);
char* sync2SimpleStr(int64_t rid); char* sync2SimpleStr(int64_t rid);

View File

@ -90,7 +90,6 @@ typedef struct {
int32_t errCode; int32_t errCode;
int32_t transId; int32_t transId;
SRWLatch lock; SRWLatch lock;
int8_t leaderTransferFinish;
int8_t selfIndex; int8_t selfIndex;
int8_t numOfReplicas; int8_t numOfReplicas;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];

View File

@ -428,18 +428,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
void mndPreClose(SMnode *pMnode) { void mndPreClose(SMnode *pMnode) {
if (pMnode != NULL) { if (pMnode != NULL) {
atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 0);
syncLeaderTransfer(pMnode->syncMgmt.sync); syncLeaderTransfer(pMnode->syncMgmt.sync);
#if 0
mInfo("vgId:1, mnode start leader transfer");
// wait for leader transfer finish
while (!atomic_load_8(&(pMnode->syncMgmt.leaderTransferFinish))) {
taosMsleep(10);
mInfo("vgId:1, mnode waiting for leader transfer");
}
mInfo("vgId:1, mnode finish leader transfer");
#endif
} }
} }

View File

@ -39,25 +39,25 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return code; return code;
} }
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { void mndSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont; SSdbRaw *pRaw = pMsg->pCont;
// delete msg handle // delete msg handle
SRpcMsg rpcMsg = {0}; SRpcMsg rpcMsg = {0};
syncGetAndDelRespRpc(pMnode->syncMgmt.sync, cbMeta.seqNum, &rpcMsg.info); rpcMsg.info = pMsg->info;
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw); int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
pMgmt->errCode = cbMeta.code; pMgmt->errCode = pMeta->code;
mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
" role:%s raw:%p", " role:%s raw:%p",
transId, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex, syncStr(cbMeta.state), transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
pRaw); pRaw);
if (pMgmt->errCode == 0) { if (pMgmt->errCode == 0) {
sdbWriteWithoutFree(pMnode->pSdb, pRaw); sdbWriteWithoutFree(pMnode->pSdb, pRaw);
sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex); sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
} }
taosWLockLatch(&pMgmt->lock); taosWLockLatch(&pMgmt->lock);
@ -87,7 +87,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
} }
} }
int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) { int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
mInfo("start to read snapshot from sdb in atomic way"); mInfo("start to read snapshot from sdb in atomic way");
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm,
@ -95,13 +95,13 @@ int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pR
return 0; return 0;
} }
int32_t mndSyncGetSnapshotInfo(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { int32_t mndSyncGetSnapshotInfo(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex); sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex);
return 0; return 0;
} }
void mndRestoreFinish(struct SSyncFSM *pFsm) { void mndRestoreFinish(const SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
if (!pMnode->deploy) { if (!pMnode->deploy) {
@ -113,32 +113,30 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
} }
} }
void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {} int32_t mndSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
mInfo("start to read snapshot from sdb"); mInfo("start to read snapshot from sdb");
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL); return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL);
} }
int32_t mndSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { int32_t mndSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
mInfo("stop to read snapshot from sdb"); mInfo("stop to read snapshot from sdb");
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
return sdbStopRead(pMnode->pSdb, pReader); return sdbStopRead(pMnode->pSdb, pReader);
} }
int32_t mndSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { int32_t mndSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len); return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len);
} }
int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) { int32_t mndSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
mInfo("start to apply snapshot to sdb"); mInfo("start to apply snapshot to sdb");
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter); return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
} }
int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { int32_t mndSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply, mInfo("stop to apply snapshot to sdb, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, isApply,
pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex); pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
@ -146,18 +144,12 @@ int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply,
pSnapshot->lastConfigIndex); pSnapshot->lastConfigIndex);
} }
int32_t mndSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { int32_t mndSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len); return sdbDoWrite(pMnode->pSdb, pWriter, pBuf, len);
} }
void mndLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void mndBecomeFollower(const SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
atomic_store_8(&(pMnode->syncMgmt.leaderTransferFinish), 1);
mInfo("vgId:1, mnode leader transfer finish");
}
static void mndBecomeFollower(struct SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
mInfo("vgId:1, become follower"); mInfo("vgId:1, become follower");
@ -172,7 +164,7 @@ static void mndBecomeFollower(struct SSyncFSM *pFsm) {
taosWUnLockLatch(&pMnode->syncMgmt.lock); taosWUnLockLatch(&pMnode->syncMgmt.lock);
} }
static void mndBecomeLeader(struct SSyncFSM *pFsm) { static void mndBecomeLeader(const SSyncFSM *pFsm) {
mInfo("vgId:1, become leader"); mInfo("vgId:1, become leader");
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
} }
@ -184,8 +176,8 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
pFsm->FpPreCommitCb = NULL; pFsm->FpPreCommitCb = NULL;
pFsm->FpRollBackCb = NULL; pFsm->FpRollBackCb = NULL;
pFsm->FpRestoreFinishCb = mndRestoreFinish; pFsm->FpRestoreFinishCb = mndRestoreFinish;
pFsm->FpLeaderTransferCb = mndLeaderTransfer; pFsm->FpLeaderTransferCb = NULL;
pFsm->FpReConfigCb = mndReConfig; pFsm->FpReConfigCb = NULL;
pFsm->FpBecomeLeaderCb = mndBecomeLeader; pFsm->FpBecomeLeaderCb = mndBecomeLeader;
pFsm->FpBecomeFollowerCb = mndBecomeFollower; pFsm->FpBecomeFollowerCb = mndBecomeFollower;
pFsm->FpGetSnapshot = mndSyncGetSnapshot; pFsm->FpGetSnapshot = mndSyncGetSnapshot;
@ -256,32 +248,26 @@ void mndCleanupSync(SMnode *pMnode) {
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) { int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)}; pMgmt->errCode = 0;
if (req.contLen <= 0) {
terrno = TSDB_CODE_APP_ERROR;
return -1;
}
SRpcMsg req = {.msgType = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
req.pCont = rpcMallocCont(req.contLen); req.pCont = rpcMallocCont(req.contLen);
if (req.pCont == NULL) return -1; if (req.pCont == NULL) return -1;
memcpy(req.pCont, pRaw, req.contLen); memcpy(req.pCont, pRaw, req.contLen);
pMgmt->errCode = 0;
taosWLockLatch(&pMgmt->lock); taosWLockLatch(&pMgmt->lock);
if (pMgmt->transId != 0) { if (pMgmt->transId != 0) {
mError("trans:%d, can't be proposed since trans:%d alrady waiting for confirm", transId, pMgmt->transId); mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
taosWUnLockLatch(&pMgmt->lock); taosWUnLockLatch(&pMgmt->lock);
terrno = TSDB_CODE_APP_NOT_READY; terrno = TSDB_CODE_APP_NOT_READY;
return -1; return -1;
} else {
pMgmt->transId = transId;
mInfo("trans:%d, will be proposed", pMgmt->transId);
taosWUnLockLatch(&pMgmt->lock);
} }
const bool isWeak = false; mInfo("trans:%d, will be proposed", transId);
int32_t code = syncPropose(pMgmt->sync, &req, isWeak); pMgmt->transId = transId;
taosWUnLockLatch(&pMgmt->lock);
int32_t code = syncPropose(pMgmt->sync, &req, false);
if (code == 0) { if (code == 0) {
mInfo("trans:%d, is proposing and wait sem", pMgmt->transId); mInfo("trans:%d, is proposing and wait sem", pMgmt->transId);
tsem_wait(&pMgmt->syncSem); tsem_wait(&pMgmt->syncSem);
@ -294,8 +280,8 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID); sdbSetApplyInfo(pMnode->pSdb, req.info.conn.applyIndex, req.info.conn.applyTerm, SYNC_INDEX_INVALID);
code = 0; code = 0;
} else { } else {
taosWLockLatch(&pMgmt->lock);
mInfo("trans:%d, failed to proposed since %s", transId, terrstr()); mInfo("trans:%d, failed to proposed since %s", transId, terrstr());
taosWLockLatch(&pMgmt->lock);
pMgmt->transId = 0; pMgmt->transId = 0;
taosWUnLockLatch(&pMgmt->lock); taosWUnLockLatch(&pMgmt->lock);
if (terrno == TSDB_CODE_SYN_NOT_LEADER) { if (terrno == TSDB_CODE_SYN_NOT_LEADER) {
@ -311,7 +297,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
return code; return code;
} }
if (pMgmt->errCode != 0) terrno = pMgmt->errCode; terrno = pMgmt->errCode;
return pMgmt->errCode; return pMgmt->errCode;
} }

View File

@ -237,7 +237,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t code = syncProcessMsg(pVnode->sync, pMsg); int32_t code = syncProcessMsg(pVnode->sync, pMsg);
if (code != 0) { if (code != 0) {
vGError("vgId:%d, failed to process sync msg:%p type:%s since %s", pVnode->config.vgId, pMsg, vGError("vgId:%d, failed to process sync msg:%p type:%s since %s", pVnode->config.vgId, pMsg,
TMSG_INFO(pMsg->msgType), terrstr()); TMSG_INFO(pMsg->msgType), terrstr());
} }
return code; return code;
@ -278,78 +278,61 @@ static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
return code; return code;
} }
static int32_t vnodeSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { static int32_t vnodeSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot) {
vnodeGetSnapshot(pFsm->data, pSnapshot); vnodeGetSnapshot(pFsm->data, pSnapshot);
return 0; return 0;
} }
static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta *cbMeta) {} static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
SVnode *pVnode = pFsm->data;
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { if (pMeta->code == 0) {
if (cbMeta.isWeak == 0) { SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
SVnode *pVnode = pFsm->data; rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
rpcMsg.info = pMsg->info;
rpcMsg.info.conn.applyIndex = pMeta->index;
rpcMsg.info.conn.applyTerm = pMeta->term;
if (cbMeta.code == 0) { const STraceId *trace = &pMsg->info.traceId;
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
rpcMsg.info.conn.applyTerm = cbMeta.term;
vInfo("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
", weak:%d, code:%d, state:%d %s, type:%s", ", weak:%d, code:%d, state:%d %s, type:%s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.term, rpcMsg.info.conn.applyIndex, cbMeta.isWeak, syncGetVgId(pVnode->sync), pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak,
cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType)); pMeta->code, pMeta->state, syncUtilState2String(pMeta->state), TMSG_INFO(pMsg->msgType));
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else { } else {
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info}; SRpcMsg rsp = {.code = pMeta->code, .info = pMsg->info};
vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", syncGetVgId(pVnode->sync), vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", syncGetVgId(pVnode->sync),
TMSG_INFO(pMsg->msgType), cbMeta.index, cbMeta.code, tstrerror(cbMeta.code)); TMSG_INFO(pMsg->msgType), pMeta->index, pMeta->code, tstrerror(pMeta->code));
if (rsp.info.handle != NULL) { if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
}
} }
} }
} }
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
if (cbMeta.isWeak == 1) { if (pMeta->isWeak == 0) {
SVnode *pVnode = pFsm->data; vnodeSyncApplyMsg(pFsm->data, pMsg, pMeta);
vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
if (cbMeta.code == 0) {
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
rpcMsg.info.conn.applyTerm = cbMeta.term;
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
vError("vgId:%d, pre-commit-cb execute error, type:%s, error:0x%x %s", syncGetVgId(pVnode->sync),
TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
}
} }
} }
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
if (pMeta->isWeak == 1) {
vnodeSyncApplyMsg(pFsm->data, pMsg, pMeta);
}
}
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncGetVgId(pVnode->sync), pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state,
syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType)); syncUtilState2String(pMeta->state), TMSG_INFO(pMsg->msgType));
} }
#define USE_TSDB_SNAPSHOT #define USE_TSDB_SNAPSHOT
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
#ifdef USE_TSDB_SNAPSHOT #ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
SSnapshotParam *pSnapshotParam = pParam; SSnapshotParam *pSnapshotParam = pParam;
@ -361,7 +344,7 @@ static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void
#endif #endif
} }
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { static int32_t vnodeSnapshotStopRead(const SSyncFSM *pFsm, void *pReader) {
#ifdef USE_TSDB_SNAPSHOT #ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapReaderClose(pReader); int32_t code = vnodeSnapReaderClose(pReader);
@ -372,7 +355,7 @@ static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
#endif #endif
} }
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
#ifdef USE_TSDB_SNAPSHOT #ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len); int32_t code = vnodeSnapRead(pReader, (uint8_t **)ppBuf, len);
@ -391,7 +374,7 @@ static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **
#endif #endif
} }
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) { static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) {
#ifdef USE_TSDB_SNAPSHOT #ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
SSnapshotParam *pSnapshotParam = pParam; SSnapshotParam *pSnapshotParam = pParam;
@ -415,7 +398,7 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
#endif #endif
} }
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
#ifdef USE_TSDB_SNAPSHOT #ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64, vInfo("vgId:%d, stop write vnode snapshot, apply:%d, index:%" PRId64 " term:%" PRIu64 " config:%" PRId64,
@ -430,7 +413,7 @@ static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool
#endif #endif
} }
static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { static int32_t vnodeSnapshotDoWrite(const SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
#ifdef USE_TSDB_SNAPSHOT #ifdef USE_TSDB_SNAPSHOT
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len); vDebug("vgId:%d, continue write vnode snapshot, len:%d", pVnode->config.vgId, len);
@ -442,9 +425,7 @@ static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *
#endif #endif
} }
static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {} static void vnodeRestoreFinish(const SSyncFSM *pFsm) {
static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
do { do {
@ -464,7 +445,7 @@ static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
vDebug("vgId:%d, sync restore finished", pVnode->config.vgId); vDebug("vgId:%d, sync restore finished", pVnode->config.vgId);
} }
static void vnodeBecomeFollower(struct SSyncFSM *pFsm) { static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, become follower", pVnode->config.vgId); vDebug("vgId:%d, become follower", pVnode->config.vgId);
@ -478,7 +459,7 @@ static void vnodeBecomeFollower(struct SSyncFSM *pFsm) {
taosThreadMutexUnlock(&pVnode->lock); taosThreadMutexUnlock(&pVnode->lock);
} }
static void vnodeBecomeLeader(struct SSyncFSM *pFsm) { static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
vDebug("vgId:%d, become leader", pVnode->config.vgId); vDebug("vgId:%d, become leader", pVnode->config.vgId);
@ -500,10 +481,10 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
pFsm->FpRollBackCb = vnodeSyncRollBackMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot; pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
pFsm->FpRestoreFinishCb = vnodeRestoreFinish; pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
pFsm->FpLeaderTransferCb = vnodeLeaderTransfer; pFsm->FpLeaderTransferCb = NULL;
pFsm->FpBecomeLeaderCb = vnodeBecomeLeader; pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
pFsm->FpBecomeFollowerCb = vnodeBecomeFollower; pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
pFsm->FpReConfigCb = vnodeSyncReconfig; pFsm->FpReConfigCb = NULL;
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead; pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead; pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead; pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;

View File

@ -737,30 +737,7 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
syncNodeRelease(pSyncNode); syncNodeRelease(pSyncNode);
} }
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandleInfo* pInfo) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR;
}
ASSERT(rid == pSyncNode->rid);
SRespStub stub;
int32_t ret = syncRespMgrGet(pSyncNode->pSyncRespMgr, index, &stub);
if (ret == 1) {
memcpy(msg, &(stub.rpcMsg), sizeof(SRpcMsg));
}
syncNodeRelease(pSyncNode);
return ret;
}
int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR;
}
ASSERT(rid == pSyncNode->rid);
SRespStub stub; SRespStub stub;
int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub); int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub);
if (ret == 1) { if (ret == 1) {
@ -768,8 +745,6 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo)
} }
sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle); sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle);
syncNodeRelease(pSyncNode);
return ret;
} }
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) { void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) {
@ -3127,17 +3102,18 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
} }
if (ths->pFsm->FpLeaderTransferCb != NULL) { if (ths->pFsm->FpLeaderTransferCb != NULL) {
SFsmCbMeta cbMeta = {0}; SFsmCbMeta cbMeta = {
cbMeta.code = 0; cbMeta.code = 0,
cbMeta.currentTerm = ths->pRaftStore->currentTerm; cbMeta.currentTerm = ths->pRaftStore->currentTerm,
cbMeta.flag = 0; cbMeta.flag = 0,
cbMeta.index = pEntry->index; cbMeta.index = pEntry->index,
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
cbMeta.isWeak = pEntry->isWeak; cbMeta.isWeak = pEntry->isWeak,
cbMeta.seqNum = pEntry->seqNum; cbMeta.seqNum = pEntry->seqNum,
cbMeta.state = ths->state; cbMeta.state = ths->state,
cbMeta.term = pEntry->term; cbMeta.term = pEntry->term,
ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, cbMeta); };
ths->pFsm->FpLeaderTransferCb(ths->pFsm, pRpcMsg, &cbMeta);
} }
syncLeaderTransferDestroy(pSyncLeaderTransfer); syncLeaderTransferDestroy(pSyncLeaderTransfer);
@ -3314,18 +3290,20 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
// execute fsm in apply thread, or execute outside syncPropose // execute fsm in apply thread, or execute outside syncPropose
if (internalExecute) { if (internalExecute) {
SFsmCbMeta cbMeta = {0}; SFsmCbMeta cbMeta = {
cbMeta.index = pEntry->index; .index = pEntry->index,
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); .lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index),
cbMeta.isWeak = pEntry->isWeak; .isWeak = pEntry->isWeak,
cbMeta.code = 0; .code = 0,
cbMeta.state = ths->state; .state = ths->state,
cbMeta.seqNum = pEntry->seqNum; .seqNum = pEntry->seqNum,
cbMeta.term = pEntry->term; .term = pEntry->term,
cbMeta.currentTerm = ths->pRaftStore->currentTerm; .currentTerm = ths->pRaftStore->currentTerm,
cbMeta.flag = flag; .flag = flag,
};
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); syncGetAndDelRespRpc(ths, cbMeta.seqNum, &rpcMsg.info);
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
} }
} }