From 8dacdd57c5e57d7297d9553807c46ed214f4c518 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 25 Jul 2022 17:10:04 +0800 Subject: [PATCH 1/2] enh: add batch processing method to vnode --- source/dnode/vnode/src/vnd/vnodeSync.c | 158 +++++++++++++++++++------ 1 file changed, 120 insertions(+), 38 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index dbe4458681..b00d6b5eb6 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -16,23 +16,28 @@ #define _DEFAULT_SOURCE #include "vnd.h" +#define BATCH_DISABLE 1 + static inline bool vnodeIsMsgBlock(tmsg_t type) { return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_CREATE_TABLE) || - (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL); + (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) || (type == TDMT_VND_UPDATE_TAG_VAL) || + (type == TDMT_VND_ALTER_REPLICA); } static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; } static inline void vnodeWaitBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { if (vnodeIsMsgBlock(pMsg->msgType)) { - vTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); + const STraceId *trace = &pMsg->info.traceId; + vGTrace("vgId:%d, msg:%p wait block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); tsem_wait(&pVnode->syncSem); } } static inline void vnodePostBlockMsg(SVnode *pVnode, const SRpcMsg *pMsg) { if (vnodeIsMsgBlock(pMsg->msgType)) { - vTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); + const STraceId *trace = &pMsg->info.traceId; + vGTrace("vgId:%d, msg:%p post block, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); tsem_post(&pVnode->syncSem); } } @@ -124,60 +129,137 @@ void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) { tmsgSendRedirectRsp(&rsp, &newEpSet); } +static void inline vnodeHandleWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { + SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; + if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) { + rsp.code = terrno; + const STraceId *trace = &pMsg->info.traceId; + vGError("vgId:%d, msg:%p failed to apply right now since %s", pVnode->config.vgId, pMsg, terrstr()); + } + if (rsp.info.handle != NULL) { + tmsgSendRsp(&rsp); + } +} + +static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) { + if (code == TSDB_CODE_SYN_NOT_LEADER) { + vnodeRedirectRpcMsg(pVnode, pMsg); + } else { + const STraceId *trace = &pMsg->info.traceId; + vGError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", pVnode->config.vgId, pMsg, tstrerror(code), code); + SRpcMsg rsp = {.code = code, .info = pMsg->info}; + if (rsp.info.handle != NULL) { + tmsgSendRsp(&rsp); + } + } +} + +static void vnodeHandleAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { + int32_t code = vnodeProcessAlterReplicaReq(pVnode, pMsg); + + if (code > 0) { + ASSERT(0); + } else if (code == 0) { + vnodeWaitBlockMsg(pVnode, pMsg); + } else { + if (terrno != 0) code = terrno; + vnodeHandleProposeError(pVnode, pMsg, code); + } + + const STraceId *trace = &pMsg->info.traceId; + vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); +} + +static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg *pMsgArr, bool *pIsWeakArr, int32_t *arrSize) { + if (*arrSize <= 0) return; + +#if BATCH_DISABLE + int32_t code = syncPropose(pVnode->sync, pMsgArr, pIsWeakArr[0]); +#else + int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize); +#endif + + if (code > 0) { + for (int32_t i = 0; i < *arrSize; ++i) { + vnodeHandleWriteMsg(pVnode, pMsgArr + i); + } + } else if (code == 0) { + vnodeWaitBlockMsg(pVnode, pMsgArr + (*arrSize - 1)); + } else { + if (terrno != 0) code = terrno; + for (int32_t i = 0; i < *arrSize; ++i) { + vnodeHandleProposeError(pVnode, pMsgArr + i, code); + } + } + + for (int32_t i = 0; i < *arrSize; ++i) { + SRpcMsg *pMsg = pMsgArr + i; + rpcFreeCont(pMsg->pCont); + } + + *arrSize = 0; +} + void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { SVnode *pVnode = pInfo->ahandle; int32_t vgId = pVnode->config.vgId; int32_t code = 0; SRpcMsg *pMsg = NULL; - + int32_t arrayPos = 0; + SRpcMsg *pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg)); + bool *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool)); vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs); - for (int32_t m = 0; m < numOfMsgs; m++) { + for (int32_t msg = 0; msg < numOfMsgs; msg++) { if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; + bool isWeak = vnodeIsMsgWeak(pMsg->msgType); + bool isBlock = vnodeIsMsgBlock(pMsg->msgType); + const STraceId *trace = &pMsg->info.traceId; - vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle); + vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg, + isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle); + + if (pMsgArr == NULL || pIsWeakArr == NULL) { + vGError("vgId:%d, msg:%p failed to process since out of memory", vgId, pMsg); + vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_OUT_OF_MEMORY); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + continue; + } code = vnodePreProcessWriteMsg(pVnode, pMsg); if (code != 0) { - vError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr()); - } else { - if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) { - code = vnodeProcessAlterReplicaReq(pVnode, pMsg); - } else { - code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType)); - if (code > 0) { - SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; - if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) { - rsp.code = terrno; - vError("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr()); - } - if (rsp.info.handle != NULL) { - tmsgSendRsp(&rsp); - } - } else if (code == 0) { - vnodeWaitBlockMsg(pVnode, pMsg); - } else { - } - } + vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr()); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + continue; } - if (code < 0) { - if (terrno == TSDB_CODE_SYN_NOT_LEADER) { - vnodeRedirectRpcMsg(pVnode, pMsg); - } else { - if (terrno != 0) code = terrno; - vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code); - SRpcMsg rsp = {.code = code, .info = pMsg->info}; - if (rsp.info.handle != NULL) { - tmsgSendRsp(&rsp); - } - } + if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) { + vnodeHandleAlterReplicaReq(pVnode, pMsg); + continue; + } + + if (isBlock || BATCH_DISABLE) { + vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos); + } + + pMsgArr[arrayPos] = *pMsg; + pIsWeakArr[arrayPos] = isWeak; + arrayPos++; + + if (isBlock || msg == numOfMsgs - 1 || BATCH_DISABLE) { + vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos); } vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code); - rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } + + taosMemoryFree(pMsgArr); + taosMemoryFree(pIsWeakArr); } void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { From 3bceeef43971a68102ee299a6201de4385527dd6 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 25 Jul 2022 17:16:26 +0800 Subject: [PATCH 2/2] enh: add batch processing method to vnode --- source/dnode/vnode/src/vnd/vnodeSync.c | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index b00d6b5eb6..6bc057e5ac 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -172,31 +172,34 @@ static void vnodeHandleAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } -static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg *pMsgArr, bool *pIsWeakArr, int32_t *arrSize) { +static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) { if (*arrSize <= 0) return; #if BATCH_DISABLE - int32_t code = syncPropose(pVnode->sync, pMsgArr, pIsWeakArr[0]); + int32_t code = syncPropose(pVnode->sync, pMsgArr[0], pIsWeakArr[0]); #else int32_t code = syncProposeBatch(pVnode->sync, pMsgArr, pIsWeakArr, *arrSize); #endif if (code > 0) { for (int32_t i = 0; i < *arrSize; ++i) { - vnodeHandleWriteMsg(pVnode, pMsgArr + i); + vnodeHandleWriteMsg(pVnode, pMsgArr[i]); } } else if (code == 0) { - vnodeWaitBlockMsg(pVnode, pMsgArr + (*arrSize - 1)); + vnodeWaitBlockMsg(pVnode, pMsgArr[*arrSize - 1]); } else { if (terrno != 0) code = terrno; for (int32_t i = 0; i < *arrSize; ++i) { - vnodeHandleProposeError(pVnode, pMsgArr + i, code); + vnodeHandleProposeError(pVnode, pMsgArr[i], code); } } for (int32_t i = 0; i < *arrSize; ++i) { - SRpcMsg *pMsg = pMsgArr + i; + SRpcMsg *pMsg = pMsgArr[i]; + const STraceId *trace = &pMsg->info.traceId; + vGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->config.vgId, pMsg, code); rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); } *arrSize = 0; @@ -208,7 +211,7 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) int32_t code = 0; SRpcMsg *pMsg = NULL; int32_t arrayPos = 0; - SRpcMsg *pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg)); + SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg*)); bool *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool)); vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs); @@ -246,16 +249,13 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos); } - pMsgArr[arrayPos] = *pMsg; + pMsgArr[arrayPos] = pMsg; pIsWeakArr[arrayPos] = isWeak; arrayPos++; if (isBlock || msg == numOfMsgs - 1 || BATCH_DISABLE) { vnodeProposeBatchMsg(pVnode, pMsgArr, pIsWeakArr, &arrayPos); } - - vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code); - taosFreeQitem(pMsg); } taosMemoryFree(pMsgArr);