Merge pull request #15394 from taosdata/fix/batch
enh: add batch processing method to vnode
This commit is contained in:
commit
d77ea8a15c
|
@ -16,23 +16,28 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "vnd.h"
|
||||
|
||||
#define BATCH_DISABLE 1
|
||||
|
||||
static inline bool vnodeIsMsgBlock(tmsg_t type) {
|
||||
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) ||
|
||||
(type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL);
|
||||
(type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL) ||
|
||||
(type == TDMT_VND_ALTER_REPLICA);
|
||||
}
|
||||
|
||||
static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
|
||||
|
||||
static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
||||
if (vnodeIsMsgBlock(pMsg->msgType)) {
|
||||
vTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
||||
tsem_wait(&pVnode->syncSem);
|
||||
}
|
||||
}
|
||||
|
||||
static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) {
|
||||
if (vnodeIsMsgBlock(pMsg->msgType)) {
|
||||
vTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
|
||||
tsem_post(&pVnode->syncSem);
|
||||
}
|
||||
}
|
||||
|
@ -124,60 +129,137 @@ void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
tmsgSendRedirectRsp(&rsp, &newEpSet);
|
||||
}
|
||||
|
||||
static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
||||
if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
|
||||
rsp.code = terrno;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr());
|
||||
}
|
||||
if (rsp.info.handle != NULL) {
|
||||
tmsgSendRsp(&rsp);
|
||||
}
|
||||
}
|
||||
|
||||
static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) {
|
||||
if (code == TSDB_CODE_SYN_NOT_LEADER) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||
} else {
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code);
|
||||
SRpcMsg rsp = {.code = code, .info = pMsg->info};
|
||||
if (rsp.info.handle != NULL) {
|
||||
tmsgSendRsp(&rsp);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void vnodeHandleAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
int32_t code = vnodeProcessAlterReplicaReq(pVnode, pMsg);
|
||||
|
||||
if (code > 0) {
|
||||
ASSERT(0);
|
||||
} else if (code == 0) {
|
||||
vnodeWaitBlockMsg(pVnode, pMsg);
|
||||
} else {
|
||||
if (terrno != 0) code = terrno;
|
||||
vnodeHandleProposeError(pVnode, pMsg, code);
|
||||
}
|
||||
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
|
||||
if (*arrSize <= 0) return;
|
||||
|
||||
#if BATCH_DISABLE
|
||||
int32_t code = syncPropose(pVnode->sync, pMsgArr[0], pIsWeakArr[0]);
|
||||
#else
|
||||
int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize);
|
||||
#endif
|
||||
|
||||
if (code > 0) {
|
||||
for (int32_t i = 0; i < *arrSize; ++i) {
|
||||
vnodeHandleWriteMsg(pVnode, pMsgArr[i]);
|
||||
}
|
||||
} else if (code == 0) {
|
||||
vnodeWaitBlockMsg(pVnode, pMsgArr[*arrSize - 1]);
|
||||
} else {
|
||||
if (terrno != 0) code = terrno;
|
||||
for (int32_t i = 0; i < *arrSize; ++i) {
|
||||
vnodeHandleProposeError(pVnode, pMsgArr[i], code);
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < *arrSize; ++i) {
|
||||
SRpcMsg *pMsg = pMsgArr[i];
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
*arrSize = 0;
|
||||
}
|
||||
|
||||
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnode *pVnode = pInfo->ahandle;
|
||||
int32_t vgId = pVnode->config.vgId;
|
||||
int32_t code = 0;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
||||
int32_t arrayPos = 0;
|
||||
SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg*));
|
||||
bool *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool));
|
||||
vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs);
|
||||
|
||||
for (int32_t m = 0; m < numOfMsgs; m++) {
|
||||
for (int32_t msg = 0; msg < numOfMsgs; msg++) {
|
||||
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
|
||||
bool isWeak = vnodeIsMsgWeak(pMsg->msgType);
|
||||
bool isBlock = vnodeIsMsgBlock(pMsg->msgType);
|
||||
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle);
|
||||
vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
|
||||
isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);
|
||||
|
||||
if (pMsgArr == NULL || pIsWeakArr == NULL) {
|
||||
vGError("vgId:%d, msg:%p failed to process since out of memory", vgId, pMsg);
|
||||
vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_OUT_OF_MEMORY);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
continue;
|
||||
}
|
||||
|
||||
code = vnodePreProcessWriteMsg(pVnode, pMsg);
|
||||
if (code != 0) {
|
||||
vError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
|
||||
} else {
|
||||
if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) {
|
||||
code = vnodeProcessAlterReplicaReq(pVnode, pMsg);
|
||||
} else {
|
||||
code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType));
|
||||
if (code > 0) {
|
||||
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
||||
if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
|
||||
rsp.code = terrno;
|
||||
vError("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr());
|
||||
}
|
||||
if (rsp.info.handle != NULL) {
|
||||
tmsgSendRsp(&rsp);
|
||||
}
|
||||
} else if (code == 0) {
|
||||
vnodeWaitBlockMsg(pVnode, pMsg);
|
||||
} else {
|
||||
}
|
||||
}
|
||||
vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (code < 0) {
|
||||
if (terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||
} else {
|
||||
if (terrno != 0) code = terrno;
|
||||
vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code);
|
||||
SRpcMsg rsp = {.code = code, .info = pMsg->info};
|
||||
if (rsp.info.handle != NULL) {
|
||||
tmsgSendRsp(&rsp);
|
||||
}
|
||||
}
|
||||
if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) {
|
||||
vnodeHandleAlterReplicaReq(pVnode, pMsg);
|
||||
continue;
|
||||
}
|
||||
|
||||
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
if (isBlock || BATCH_DISABLE) {
|
||||
vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
|
||||
}
|
||||
|
||||
pMsgArr[arrayPos] = pMsg;
|
||||
pIsWeakArr[arrayPos] = isWeak;
|
||||
arrayPos++;
|
||||
|
||||
if (isBlock || msg == numOfMsgs - 1 || BATCH_DISABLE) {
|
||||
vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos);
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(pMsgArr);
|
||||
taosMemoryFree(pIsWeakArr);
|
||||
}
|
||||
|
||||
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
|
|
Loading…
Reference in New Issue