enh: send rsp of negotiation failure ahead of apply queue

This commit is contained in:
Benguang Zhao 2023-03-09 16:18:44 +08:00
parent b1d21472ed
commit 9d10739908
3 changed files with 14 additions and 9 deletions

View File

@ -306,13 +306,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
void *pReq;
int32_t len;
int32_t ret;
/*
if (!pVnode->inUse) {
terrno = TSDB_CODE_VND_NO_AVAIL_BUFPOOL;
vError("vgId:%d, not ready to write since %s", TD_VID(pVnode), terrstr());
return -1;
}
*/
if (version <= pVnode->state.applied) {
vError("vgId:%d, duplicate write request. version: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), version,
pVnode->state.applied);

View File

@ -429,7 +429,18 @@ static int32_t vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsm
pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code,
pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType), pMsg->code);
return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
if (pMsg->code == 0) {
return tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg);
}
vnodePostBlockMsg(pVnode, pMsg);
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
rpcFreeCont(pMsg->pCont);
pMsg->pCont = NULL;
return 0;
}
static int32_t vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) {

View File

@ -475,7 +475,7 @@ _out:
int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTerm term, SSyncRaftEntry* pEntry,
int32_t applyCode) {
if ((pNode->replicaNum == 1) && pNode->restoreFinish && pNode->vgId != 1) {
if (pNode->replicaNum == 1 && pNode->restoreFinish && pNode->vgId != 1) {
return 0;
}