refact vnode write queue

This commit is contained in:
Shengliang Guan 2022-03-21 14:39:35 +08:00
parent 90e9447370
commit 50229512d3
3 changed files with 27 additions and 13 deletions

View File

@ -63,36 +63,50 @@ static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg); vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
} }
static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code};
dndSendRsp(pWrapper, &rsp);
}
static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) {
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
if (pArray == NULL) {
dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr());
return;
}
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
SNodeMsg *pMsg = NULL; SNodeMsg *pMsg = NULL;
taosGetQitem(qall, (void **)&pMsg); if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
dTrace("msg:%p, will be processed in vnode write queue", pMsg);
void *ptr = taosArrayPush(pArray, &pMsg); dTrace("msg:%p, will be processed in vnode-write queue", pMsg);
assert(ptr != NULL); if (taosArrayPush(pArray, &pMsg) == NULL) {
dTrace("msg:%p, failed to process since %s", pMsg, terrstr());
vmSendRsp(pVnode->pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY);
}
} }
vnodeProcessWMsgs(pVnode->pImpl, pArray); vnodeProcessWMsgs(pVnode->pImpl, pArray);
for (size_t i = 0; i < numOfMsgs; i++) { numOfMsgs = taosArrayGetSize(pArray);
SRpcMsg *pRsp = NULL; for (int32_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp); SRpcMsg *pRsp = NULL;
int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp);
if (pRsp != NULL) { if (pRsp != NULL) {
pRsp->ahandle = pRpc->ahandle; pRsp->ahandle = pRpc->ahandle;
dndSendRsp(pVnode->pWrapper, pRsp); dndSendRsp(pVnode->pWrapper, pRsp);
free(pRsp); free(pRsp);
} else { } else {
if (code != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
SRpcMsg rpcRsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code}; SRpcMsg rpcRsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code};
dndSendRsp(pVnode->pWrapper, &rpcRsp); dndSendRsp(pVnode->pWrapper, &rpcRsp);
} }
} }
for (size_t i = 0; i < numOfMsgs; i++) { for (int32_t i = 0; i < numOfMsgs; i++) {
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
dTrace("msg:%p, is freed", pMsg); dTrace("msg:%p, is freed", pMsg);
rpcFreeCont(pMsg->rpcMsg.pCont); rpcFreeCont(pMsg->rpcMsg.pCont);

View File

@ -126,9 +126,8 @@ void vnodeDestroy(const char *path);
* *
* @param pVnode The vnode object. * @param pVnode The vnode object.
* @param pMsgs The array of SRpcMsg * @param pMsgs The array of SRpcMsg
* @return int 0 for success, -1 for failure
*/ */
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs); void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs);
/** /**
* @brief Apply a write request message. * @brief Apply a write request message.

View File

@ -16,7 +16,7 @@
#include "tq.h" #include "tq.h"
#include "vnd.h" #include "vnd.h"
int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
SNodeMsg *pMsg; SNodeMsg *pMsg;
SRpcMsg *pRpc; SRpcMsg *pRpc;
@ -40,7 +40,8 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
// TODO: Integrate RAFT module here // TODO: Integrate RAFT module here
return 0; // No results are returned because error handling is difficult
// return 0;
} }
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {