fix(sync): propose batch

This commit is contained in:
Minghao Li 2022-07-06 13:17:16 +08:00
parent 730a7a4a58
commit 6ec47a7a60
1 changed files with 32 additions and 6 deletions

View File

@ -707,14 +707,37 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe
syncClientRequestBatch2RpcMsg(pSyncMsg, &rpcMsg); syncClientRequestBatch2RpcMsg(pSyncMsg, &rpcMsg);
taosMemoryFree(pSyncMsg); // only free msg body, do not free rpc msg content taosMemoryFree(pSyncMsg); // only free msg body, do not free rpc msg content
if (pSyncNode->replicaNum == 1 && pSyncNode->vgId != 1) {
int32_t code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
if (code == 0) {
// update rpc msg applyIndex
SRpcMsg* msgArr = syncClientRequestBatchRpcMsgArr(pSyncMsg);
ASSERT(arrSize == pSyncMsg->dataCount);
for (int i = 0; i < arrSize; ++i) {
pMsgArr[i].info.conn.applyIndex = msgArr[i].info.conn.applyIndex;
syncRespMgrDel(pSyncNode->pSyncRespMgr, raftArr[i].seqNum);
}
rpcFreeCont(rpcMsg.pCont);
terrno = 0;
return 1;
} else {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
}
} else {
if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
// enqueue msg ok // enqueue msg ok
return 0;
} else { } else {
sError("vgId:%d, enqueue msg error, FpEqMsg is NULL", pSyncNode->vgId); sError("vgId:%d, enqueue msg error, FpEqMsg is NULL", pSyncNode->vgId);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
} }
}
return 0; return 0;
} }
@ -2490,6 +2513,9 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
// update rpc msg conn apply.index
msgArr[i].info.conn.applyIndex = pEntry->index;
} }
// fsync once // fsync once
@ -2498,7 +2524,7 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p
walFsync(pWal, true); walFsync(pWal, true);
if (ths->replicaNum > 1) { if (ths->replicaNum > 1) {
// if mulit replica, start replicate right now // if multi replica, start replicate right now
syncNodeReplicate(ths); syncNodeReplicate(ths);
} else if (ths->replicaNum == 1) { } else if (ths->replicaNum == 1) {