fix: get result of tmsPutToQueue for FpCommitCb
This commit is contained in:
parent
e97c8f637b
commit
2ce07c6ec1
|
@ -138,8 +138,8 @@ typedef struct SSnapshotMeta {
|
||||||
typedef struct SSyncFSM {
|
typedef struct SSyncFSM {
|
||||||
void* data;
|
void* data;
|
||||||
|
|
||||||
void (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
|
int32_t (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
|
||||||
void (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
|
int32_t (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
|
||||||
void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
|
void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
|
||||||
|
|
||||||
void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm);
|
void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm);
|
||||||
|
|
|
@ -72,7 +72,7 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
int32_t mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
SSdbRaw *pRaw = pMsg->pCont;
|
SSdbRaw *pRaw = pMsg->pCont;
|
||||||
|
@ -114,12 +114,15 @@ void mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *p
|
||||||
mError("trans:%d, not found while execute in mnode since %s", transId, terrstr());
|
mError("trans:%d, not found while execute in mnode since %s", transId, terrstr());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
int32_t mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||||
mndProcessWriteMsg(pFsm, pMsg, pMeta);
|
int32_t code = mndProcessWriteMsg(pFsm, pMsg, pMeta);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
pMsg->pCont = NULL;
|
pMsg->pCont = NULL;
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
|
int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) {
|
||||||
|
|
|
@ -295,7 +295,7 @@ static int32_t vnodeSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||||
SVnode *pVnode = pFsm->data;
|
SVnode *pVnode = pFsm->data;
|
||||||
pMsg->info.conn.applyIndex = pMeta->index;
|
pMsg->info.conn.applyIndex = pMeta->index;
|
||||||
pMsg->info.conn.applyTerm = pMeta->term;
|
pMsg->info.conn.applyTerm = pMeta->term;
|
||||||
|
@ -306,17 +306,18 @@ static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbM
|
||||||
pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
|
pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
|
||||||
pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType));
|
pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType));
|
||||||
|
|
||||||
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
|
return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||||
vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
|
return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
static int32_t vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||||
if (pMeta->isWeak == 1) {
|
if (pMeta->isWeak == 1) {
|
||||||
vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
|
return vnodeSyncApplyMsg(pFsm, pMsg, pMeta);
|
||||||
}
|
}
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {
|
||||||
|
|
|
@ -457,9 +457,9 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn
|
||||||
cbMeta.flag = -1;
|
cbMeta.flag = -1;
|
||||||
|
|
||||||
(void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
|
(void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
|
||||||
pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
|
int32_t code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
|
||||||
ASSERT(rpcMsg.pCont == NULL);
|
ASSERT(rpcMsg.pCont == NULL);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
|
int32_t syncLogBufferValidate(SSyncLogBuffer* pBuf) {
|
||||||
|
@ -516,8 +516,9 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
|
||||||
}
|
}
|
||||||
|
|
||||||
if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry) != 0) {
|
if (syncLogFsmExecute(pNode, pFsm, role, term, pEntry) != 0) {
|
||||||
sError("vgId:%d, failed to execute sync log entry in FSM. log index:%" PRId64 ", term:%" PRId64 "", vgId,
|
sError("vgId:%d, failed to commit sync log entry. index:%" PRId64 ", term:%" PRId64
|
||||||
pEntry->index, pEntry->term);
|
", role: %d, current term: %" PRId64,
|
||||||
|
vgId, pEntry->index, pEntry->term, role, term);
|
||||||
goto _out;
|
goto _out;
|
||||||
}
|
}
|
||||||
pBuf->commitIndex = index;
|
pBuf->commitIndex = index;
|
||||||
|
|
Loading…
Reference in New Issue