enh: add batch processing method to vnode

This commit is contained in:
Shengliang Guan 2022-07-25 17:16:26 +08:00
parent 8dacdd57c5
commit 3bceeef439
1 changed files with 11 additions and 11 deletions

View File

@ -172,31 +172,34 @@ static void vnodeHandleAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
taosFreeQitem(pMsg);
}
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg *pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
if (*arrSize <= 0) return;
#if BATCH_DISABLE
int32_t code = syncPropose(pVnode->sync, pMsgArr, pIsWeakArr[0]);
int32_t code = syncPropose(pVnode->sync, pMsgArr[0], pIsWeakArr[0]);
#else
int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
#endif
if (code > 0) {
for (int32_t i = 0; i < *arrSize; ++i) {
vnodeHandleWriteMsg(pVnode, pMsgArr + i);
vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
}
} else if (code == 0) {
vnodeWaitBlockMsg(pVnode, pMsgArr + (*arrSize - 1));
vnodeWaitBlockMsg(pVnode, pMsgArr[*arrSize - 1]);
} else {
if (terrno != 0) code = terrno;
for (int32_t i = 0; i < *arrSize; ++i) {
vnodeHandleProposeError(pVnode, pMsgArr + i, code);
vnodeHandleProposeError(pVnode, pMsgArr[i], code);
}
}
for (int32_t i = 0; i < *arrSize; ++i) {
SRpcMsg *pMsg = pMsgArr + i;
SRpcMsg *pMsg = pMsgArr[i];
const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
*arrSize = 0;
@ -208,7 +211,7 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
int32_t code = 0;
SRpcMsg *pMsg = NULL;
int32_t arrayPos = 0;
SRpcMsg *pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg));
SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg*));
bool *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
@ -246,16 +249,13 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
}
pMsgArr[arrayPos] = *pMsg;
pMsgArr[arrayPos] = pMsg;
pIsWeakArr[arrayPos] = isWeak;
arrayPos++;
if (isBlock || msg == numOfMsgs - 1 || BATCH_DISABLE) {
vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
}
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
taosFreeQitem(pMsg);
}
taosMemoryFree(pMsgArr);