refactor(sync): add pre-commit interface
This commit is contained in:
parent
43e2f8bc2b
commit
0addf09996
|
@ -506,7 +506,7 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
|
||||||
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
|
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
|
||||||
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
|
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
|
||||||
|
|
||||||
if (cbMeta.code == 0) {
|
if (cbMeta.code == 0 && cbMeta.isWeak == 0) {
|
||||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
||||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||||
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
||||||
|
@ -529,6 +529,23 @@ static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMet
|
||||||
vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
|
vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
|
||||||
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
|
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
|
||||||
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
|
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
|
||||||
|
|
||||||
|
if (cbMeta.code == 0 && cbMeta.isWeak == 1) {
|
||||||
|
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
||||||
|
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||||
|
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
||||||
|
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
|
||||||
|
rpcMsg.info.conn.applyIndex = cbMeta.index;
|
||||||
|
rpcMsg.info.conn.applyTerm = cbMeta.term;
|
||||||
|
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
|
||||||
|
} else {
|
||||||
|
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
|
||||||
|
vError("vgId:%d, sync pre-commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync),
|
||||||
|
pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
|
||||||
|
if (rsp.info.handle != NULL) {
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
|
|
|
@ -67,11 +67,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
for (SyncIndex index = syncNodeGetLastIndex(pSyncNode); index > pSyncNode->commitIndex; --index) {
|
for (SyncIndex index = syncNodeGetLastIndex(pSyncNode); index > pSyncNode->commitIndex; --index) {
|
||||||
bool agree = syncAgree(pSyncNode, index);
|
bool agree = syncAgree(pSyncNode, index);
|
||||||
|
|
||||||
if (gRaftDetailLog) {
|
|
||||||
sTrace("syncMaybeAdvanceCommitIndex syncAgree:%d, index:%" PRId64 ", pSyncNode->commitIndex:%" PRId64, agree,
|
|
||||||
index, pSyncNode->commitIndex);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (agree) {
|
if (agree) {
|
||||||
// term
|
// term
|
||||||
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index);
|
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index);
|
||||||
|
@ -82,20 +77,15 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
// update commit index
|
// update commit index
|
||||||
newCommitIndex = index;
|
newCommitIndex = index;
|
||||||
|
|
||||||
if (gRaftDetailLog) {
|
|
||||||
sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%" PRId64
|
|
||||||
" commit, pSyncNode->commitIndex:%" PRId64,
|
|
||||||
newCommitIndex, pSyncNode->commitIndex);
|
|
||||||
}
|
|
||||||
|
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestory(pEntry);
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
if (gRaftDetailLog) {
|
do {
|
||||||
sTrace("syncMaybeAdvanceCommitIndex can not commit due to term not equal, pEntry->term:%" PRIu64
|
char logBuf[128];
|
||||||
", pSyncNode->pRaftStore->currentTerm:%" PRIu64,
|
snprintf(logBuf, sizeof(logBuf), "can not commit due to term not equal, index:%ld, term:%lu", pEntry->index,
|
||||||
pEntry->term, pSyncNode->pRaftStore->currentTerm);
|
pEntry->term);
|
||||||
}
|
syncNodeEventLog(pSyncNode, logBuf);
|
||||||
|
} while (0);
|
||||||
}
|
}
|
||||||
|
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestory(pEntry);
|
||||||
|
@ -107,10 +97,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
SyncIndex beginIndex = pSyncNode->commitIndex + 1;
|
SyncIndex beginIndex = pSyncNode->commitIndex + 1;
|
||||||
SyncIndex endIndex = newCommitIndex;
|
SyncIndex endIndex = newCommitIndex;
|
||||||
|
|
||||||
if (gRaftDetailLog) {
|
|
||||||
sTrace("syncMaybeAdvanceCommitIndex sync commit %" PRId64, newCommitIndex);
|
|
||||||
}
|
|
||||||
|
|
||||||
// update commit index
|
// update commit index
|
||||||
pSyncNode->commitIndex = newCommitIndex;
|
pSyncNode->commitIndex = newCommitIndex;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue