enh: remove sync batch propose in vnode
This commit is contained in:
parent
e8703cd4a4
commit
61cd753947
|
@ -16,7 +16,7 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "vnd.h"
|
||||
|
||||
#define BATCH_DISABLE 1
|
||||
#define BATCH_ENABLE 0
|
||||
|
||||
static inline bool vnodeIsMsgBlock(tmsg_t type) {
|
||||
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
|
||||
|
@ -87,7 +87,7 @@ static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
|
||||
if (code == TSDB_CODE_SYN_NOT_LEADER) {
|
||||
if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_RESTORING) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||
} else {
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
|
@ -99,15 +99,12 @@ static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code)
|
|||
}
|
||||
}
|
||||
|
||||
#if BATCH_ENABLE
|
||||
|
||||
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
|
||||
if (*arrSize <= 0) return;
|
||||
|
||||
#if BATCH_DISABLE
|
||||
int32_t code = syncPropose(pVnode->sync, pMsgArr[0], pIsWeakArr[0]);
|
||||
#else
|
||||
int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
|
||||
#endif
|
||||
|
||||
if (code > 0) {
|
||||
for (int32_t i = 0; i < *arrSize; ++i) {
|
||||
vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
|
||||
|
@ -177,7 +174,7 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
|
|||
continue;
|
||||
}
|
||||
|
||||
if (isBlock || BATCH_DISABLE) {
|
||||
if (isBlock) {
|
||||
vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
|
||||
}
|
||||
|
||||
|
@ -185,7 +182,7 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
|
|||
pIsWeakArr[arrayPos] = isWeak;
|
||||
arrayPos++;
|
||||
|
||||
if (isBlock || msg == numOfMsgs - 1 || BATCH_DISABLE) {
|
||||
if (isBlock || msg == numOfMsgs - 1) {
|
||||
vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
|
||||
}
|
||||
}
|
||||
|
@ -194,6 +191,65 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
|
|||
taosMemoryFree(pIsWeakArr);
|
||||
}
|
||||
|
||||
#else
|
||||
|
||||
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
|
||||
int32_t code = syncPropose(pVnode->sync, pMsg, isWeak);
|
||||
if (code > 0) {
|
||||
vnodeHandleWriteMsg(pVnode, pMsg);
|
||||
} else if (code == 0) {
|
||||
vnodeWaitBlockMsg(pVnode, pMsg);
|
||||
} else {
|
||||
if (terrno != 0) code = terrno;
|
||||
vnodeHandleProposeError(pVnode, pMsg, code);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnode *pVnode = pInfo->ahandle;
|
||||
int32_t vgId = pVnode->config.vgId;
|
||||
int32_t code = 0;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
|
||||
|
||||
for (int32_t msg = 0; msg < numOfMsgs; msg++) {
|
||||
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
|
||||
bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
|
||||
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d, handle:%p", vgId, pMsg, isWeak,
|
||||
vnodeIsMsgBlock(pMsg->msgType), msg, numOfMsgs, pMsg->info.handle);
|
||||
|
||||
if (!pVnode->restored) {
|
||||
vGError("vgId:%d, msg:%p failed to process since restore not finished", vgId, pMsg);
|
||||
vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_SYN_RESTORING);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
continue;
|
||||
}
|
||||
|
||||
code = vnodePreProcessWriteMsg(pVnode, pMsg);
|
||||
if (code != 0) {
|
||||
vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
|
||||
if (terrno != 0) code = terrno;
|
||||
vnodeHandleProposeError(pVnode, pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
continue;
|
||||
}
|
||||
|
||||
code = vnodeProposeMsg(pVnode, pMsg, isWeak);
|
||||
|
||||
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnode *pVnode = pInfo->ahandle;
|
||||
int32_t vgId = pVnode->config.vgId;
|
||||
|
|
Loading…
Reference in New Issue