From cd6d4a276b6146a996ea134c842a3831df814fda Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 21 Mar 2022 13:27:54 +0800 Subject: [PATCH 1/4] minor changes --- source/dnode/mgmt/vnode/src/vmWorker.c | 2 +- source/dnode/vnode/src/tsdb/tsdbFS.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index e5d5e38328..954fe5f6a8 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -210,7 +210,7 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { int32_t code = -1; tmsg_t msgType = pMsg->rpcMsg.msgType; - dTrace("msg:%p, will be processed in vnode mgmt queue", pMsg); + dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg); switch (msgType) { case TDMT_DND_CREATE_VNODE: diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 0f0d50ad08..fa867543b0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -1289,7 +1289,7 @@ static int tsdbRestoreCurrent(STsdb *pRepo) { } if (tsdbSaveFSStatus(pRepo, pRepo->fs->cstatus) < 0) { - tsdbError("vgId:%d failed to restore corrent since %s", REPO_ID(pRepo), tstrerror(terrno)); + tsdbError("vgId:%d failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno)); return -1; } From 90e944737072457e6565c577f2abe2edff5550f2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 21 Mar 2022 14:12:20 +0800 Subject: [PATCH 2/4] adjust vnode queue --- source/dnode/mgmt/vnode/inc/vmInt.h | 5 +- source/dnode/mgmt/vnode/src/vmInt.c | 3 + source/dnode/mgmt/vnode/src/vmWorker.c | 215 +++++++++++++------------ source/dnode/vnode/inc/vnode.h | 1 + 4 files changed, 123 insertions(+), 101 deletions(-) diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index c0e7e212cc..2020a3d219 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -24,6 +24,8 @@ extern "C" { #endif +typedef enum { VND_WRITE_QUEUE, VND_QUERY_QUEUE, VND_FETCH_QUEUE, VND_APPLY_QUEUE, VND_SYNC_QUEUE } EVndQueueType; + typedef struct SVnodesMgmt { SHashObj *hash; SRWLatch latch; @@ -102,7 +104,8 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode); int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, int32_t vgId, SRpcMsg *pMsg); +int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); +int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg); diff --git a/source/dnode/mgmt/vnode/src/vmInt.c b/source/dnode/mgmt/vnode/src/vmInt.c index 8b2c2d7dd4..c5e79765c7 100644 --- a/source/dnode/mgmt/vnode/src/vmInt.c +++ b/source/dnode/mgmt/vnode/src/vmInt.c @@ -296,7 +296,10 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { vnodeOpt.nthreads = tsNumOfCommitThreads; vnodeOpt.putToQueryQFp = vmPutMsgToQueryQueue; + vnodeOpt.putToFetchQFp = vmPutMsgToQueryQueue; vnodeOpt.sendReqFp = dndSendReqToDnode; + vnodeOpt.sendMnodeReqFp = dndSendReqToMnode; + vnodeOpt.sendRspFp = dndSendRsp; if (vnodeInit(&vnodeOpt) != 0) { dError("failed to init vnode since %s", terrstr()); goto _OVER; diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 954fe5f6a8..29a8678701 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -16,6 +16,43 @@ #define _DEFAULT_SOURCE #include "vmInt.h" +static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { + int32_t code = -1; + tmsg_t msgType = pMsg->rpcMsg.msgType; + dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg); + + switch (msgType) { + case TDMT_DND_CREATE_VNODE: + code = vmProcessCreateVnodeReq(pMgmt, pMsg); + break; + case TDMT_DND_ALTER_VNODE: + code = vmProcessAlterVnodeReq(pMgmt, pMsg); + break; + case TDMT_DND_DROP_VNODE: + code = vmProcessDropVnodeReq(pMgmt, pMsg); + break; + case TDMT_DND_SYNC_VNODE: + code = vmProcessSyncVnodeReq(pMgmt, pMsg); + break; + case TDMT_DND_COMPACT_VNODE: + code = vmProcessCompactVnodeReq(pMgmt, pMsg); + break; + default: + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + dError("msg:%p, not processed in vnode-mgmt queue", pMsg); + } + + if (msgType & 1u) { + if (code != 0 && terrno != 0) code = terrno; + SRpcMsg rsp = {.code = code, .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle}; + dndSendRsp(pMgmt->pWrapper, &rsp); + } + + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); +} + static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { dTrace("msg:%p, will be processed in vnode query queue", pMsg); vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg); @@ -89,93 +126,112 @@ static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOf } } -static SVnodeObj *vmAcquireFromMsg(SVnodesMgmt *pMgmt, SNodeMsg *pNodeMsg) { - SRpcMsg *pMsg = &pNodeMsg->rpcMsg; +static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EVndQueueType qtype) { + SRpcMsg *pRpc = &pMsg->rpcMsg; + int32_t code = -1; - SMsgHead *pHead = pMsg->pCont; + SMsgHead *pHead = pRpc->pCont; pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) { - dError("vgId:%d, failed to acquire vnode while process req", pHead->vgId); + dError("vgId:%d, failed to write msg:%p to queue since %s", pHead->vgId, pMsg, terrstr()); + return -1; } - return pVnode; -} + switch (qtype) { + case VND_QUERY_QUEUE: + dTrace("msg:%p, will be written into vnode-query queue", pMsg); + code = taosWriteQitem(pVnode->pQueryQ, pMsg); + break; + case VND_FETCH_QUEUE: + dTrace("msg:%p, will be written into vnode-fetch queue", pMsg); + code = taosWriteQitem(pVnode->pFetchQ, pMsg); + break; + case VND_WRITE_QUEUE: + dTrace("msg:%p, will be written into vnode-write queue", pMsg); + code = taosWriteQitem(pVnode->pWriteQ, pMsg); + case VND_SYNC_QUEUE: + dTrace("msg:%p, will be written into vnode-sync queue", pMsg); + code = taosWriteQitem(pVnode->pSyncQ, pMsg); + default: + terrno = TSDB_CODE_INVALID_PARA; + break; + } -int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); - if (pVnode == NULL) return -1; - - int32_t code = taosWriteQitem(pVnode->pWriteQ, pMsg); vmReleaseVnode(pMgmt, pVnode); return code; } int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); - if (pVnode == NULL) return -1; + return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_SYNC_QUEUE); +} - int32_t code = taosWriteQitem(pVnode->pSyncQ, pMsg); - vmReleaseVnode(pMgmt, pVnode); - return code; +int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { + return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_WRITE_QUEUE); } int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); - if (pVnode == NULL) return -1; - - int32_t code = taosWriteQitem(pVnode->pQueryQ, pMsg); - vmReleaseVnode(pMgmt, pVnode); - return code; + return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_QUERY_QUEUE); } int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); + return vmPutNodeMsgToQueue(pMgmt, pMsg, VND_FETCH_QUEUE); +} + +int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { + SDnodeWorker *pWorker = &pMgmt->mgmtWorker; + dTrace("msg:%p, will be written to vnode-mgmt queue, worker:%s", pMsg, pWorker->name); + return dndWriteMsgToWorker(pWorker, pMsg); +} + +static int32_t vmPutRpcMsgToQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, EVndQueueType qtype) { + SVnodesMgmt *pMgmt = pWrapper->pMgmt; + int32_t code = -1; + SMsgHead *pHead = pRpc->pCont; + + SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) return -1; - int32_t code = taosWriteQitem(pVnode->pFetchQ, pMsg); + SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); + if (pMsg != NULL) { + dTrace("msg:%p, is created, type:%s", pMsg, TMSG_INFO(pRpc->msgType)); + pMsg->rpcMsg = *pRpc; + switch (qtype) { + case VND_QUERY_QUEUE: + dTrace("msg:%p, will be put into vnode-query queue", pMsg); + code = taosWriteQitem(pVnode->pQueryQ, pMsg); + break; + case VND_FETCH_QUEUE: + dTrace("msg:%p, will be put into vnode-fetch queue", pMsg); + code = taosWriteQitem(pVnode->pFetchQ, pMsg); + break; + case VND_APPLY_QUEUE: + dTrace("msg:%p, will be put into vnode-apply queue", pMsg); + code = taosWriteQitem(pVnode->pApplyQ, pMsg); + break; + case VND_WRITE_QUEUE: + case VND_SYNC_QUEUE: + default: + terrno = TSDB_CODE_INVALID_PARA; + break; + } + } vmReleaseVnode(pMgmt, pVnode); return code; } int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; - - int32_t code = -1; - SMsgHead *pHead = pRpc->pCont; - // pHead->vgId = htonl(pHead->vgId); - - SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); - if (pVnode == NULL) return -1; - - SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); - if (pMsg != NULL) { - pMsg->rpcMsg = *pRpc; - code = taosWriteQitem(pVnode->pQueryQ, pMsg); - } - vmReleaseVnode(pMgmt, pVnode); - return code; + return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_QUERY_QUEUE); } -int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, int32_t vgId, SRpcMsg *pRpc) { - SVnodesMgmt *pMgmt = pWrapper->pMgmt; +int32_t vmPutMsgToFetchQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_FETCH_QUEUE); +} - int32_t code = -1; - SMsgHead *pHead = pRpc->pCont; - // pHead->vgId = htonl(pHead->vgId); - - SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); - if (pVnode == NULL) return -1; - - SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); - if (pMsg != NULL) { - pMsg->rpcMsg = *pRpc; - code = taosWriteQitem(pVnode->pApplyQ, pMsg); - } - vmReleaseVnode(pMgmt, pVnode); - return code; +int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { + return vmPutRpcMsgToQueue(pWrapper, pRpc, VND_APPLY_QUEUE); } int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { @@ -191,6 +247,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { return -1; } + dDebug("vgId:%d, vnode queue is alloced", pVnode->vgId); return 0; } @@ -205,43 +262,7 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pSyncQ = NULL; pVnode->pFetchQ = NULL; pVnode->pQueryQ = NULL; -} - -static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - int32_t code = -1; - tmsg_t msgType = pMsg->rpcMsg.msgType; - dTrace("msg:%p, will be processed in vnode-mgmt queue", pMsg); - - switch (msgType) { - case TDMT_DND_CREATE_VNODE: - code = vmProcessCreateVnodeReq(pMgmt, pMsg); - break; - case TDMT_DND_ALTER_VNODE: - code = vmProcessAlterVnodeReq(pMgmt, pMsg); - break; - case TDMT_DND_DROP_VNODE: - code = vmProcessDropVnodeReq(pMgmt, pMsg); - break; - case TDMT_DND_SYNC_VNODE: - code = vmProcessSyncVnodeReq(pMgmt, pMsg); - break; - case TDMT_DND_COMPACT_VNODE: - code = vmProcessCompactVnodeReq(pMgmt, pMsg); - break; - default: - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - dError("msg:%p, not processed in mgmt queue", pMsg); - } - - if (msgType & 1u) { - if (code != 0) code = terrno; - SRpcMsg rsp = {.code = code, .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle}; - dndSendRsp(pMgmt->pWrapper, &rsp); - } - - dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - rpcFreeCont(pMsg->rpcMsg.pCont); - taosFreeQitem(pMsg); + dDebug("vgId:%d, vnode queue is freed", pVnode->vgId); } int32_t vmStartWorker(SVnodesMgmt *pMgmt) { @@ -275,7 +296,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { if (tWWorkerInit(pWPool) != 0) return -1; if (dndInitWorker(pMgmt, &pMgmt->mgmtWorker, DND_WORKER_SINGLE, "vnode-mgmt", 1, 1, vmProcessMgmtQueue) != 0) { - dError("failed to start dnode mgmt worker since %s", terrstr()); + dError("failed to start vnode-mgmt worker since %s", terrstr()); return -1; } @@ -291,9 +312,3 @@ void vmStopWorker(SVnodesMgmt *pMgmt) { tWWorkerCleanup(&pMgmt->syncPool); dDebug("vnode workers is closed"); } - -int32_t vmProcessMgmtMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { - SDnodeWorker *pWorker = &pMgmt->mgmtWorker; - dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg); -} \ No newline at end of file diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 36f95f233b..d762c8e4c0 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -62,6 +62,7 @@ typedef struct { typedef struct { uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) PutToQueueFp putToQueryQFp; + PutToQueueFp putToFetchQFp; SendReqFp sendReqFp; SendMnodeReqFp sendMnodeReqFp; SendRspFp sendRspFp; From 50229512d32ad7c1690f12c2a976a7c9bbaba37f Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 21 Mar 2022 14:39:35 +0800 Subject: [PATCH 3/4] refact vnode write queue --- source/dnode/mgmt/vnode/src/vmWorker.c | 32 ++++++++++++++++++------- source/dnode/vnode/inc/vnode.h | 3 +-- source/dnode/vnode/src/vnd/vnodeWrite.c | 5 ++-- 3 files changed, 27 insertions(+), 13 deletions(-) diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 29a8678701..33945d438c 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -63,36 +63,50 @@ static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg); } +static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { + SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code}; + dndSendRsp(pWrapper, &rsp); +} + static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); + if (pArray == NULL) { + dError("failed to process %d msgs in write-queue since %s", numOfMsgs, terrstr()); + return; + } for (int32_t i = 0; i < numOfMsgs; ++i) { SNodeMsg *pMsg = NULL; - taosGetQitem(qall, (void **)&pMsg); - dTrace("msg:%p, will be processed in vnode write queue", pMsg); - void *ptr = taosArrayPush(pArray, &pMsg); - assert(ptr != NULL); + if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; + + dTrace("msg:%p, will be processed in vnode-write queue", pMsg); + if (taosArrayPush(pArray, &pMsg) == NULL) { + dTrace("msg:%p, failed to process since %s", pMsg, terrstr()); + vmSendRsp(pVnode->pWrapper, pMsg, TSDB_CODE_OUT_OF_MEMORY); + } } vnodeProcessWMsgs(pVnode->pImpl, pArray); - for (size_t i = 0; i < numOfMsgs; i++) { - SRpcMsg *pRsp = NULL; + numOfMsgs = taosArrayGetSize(pArray); + for (int32_t i = 0; i < numOfMsgs; i++) { SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); SRpcMsg *pRpc = &pMsg->rpcMsg; - int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp); + SRpcMsg *pRsp = NULL; + + int32_t code = vnodeApplyWMsg(pVnode->pImpl, pRpc, &pRsp); if (pRsp != NULL) { pRsp->ahandle = pRpc->ahandle; dndSendRsp(pVnode->pWrapper, pRsp); free(pRsp); } else { - if (code != 0) code = terrno; + if (code != 0 && terrno != 0) code = terrno; SRpcMsg rpcRsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code}; dndSendRsp(pVnode->pWrapper, &rpcRsp); } } - for (size_t i = 0; i < numOfMsgs; i++) { + for (int32_t i = 0; i < numOfMsgs; i++) { SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); dTrace("msg:%p, is freed", pMsg); rpcFreeCont(pMsg->rpcMsg.pCont); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index d762c8e4c0..1ffd4e0d78 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -126,9 +126,8 @@ void vnodeDestroy(const char *path); * * @param pVnode The vnode object. * @param pMsgs The array of SRpcMsg - * @return int 0 for success, -1 for failure */ -int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs); +void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs); /** * @brief Apply a write request message. diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 3fa987ab9b..d3769b8a30 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -16,7 +16,7 @@ #include "tq.h" #include "vnd.h" -int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { +void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { SNodeMsg *pMsg; SRpcMsg *pRpc; @@ -40,7 +40,8 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { // TODO: Integrate RAFT module here - return 0; + // No results are returned because error handling is difficult + // return 0; } int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { From 952bf4f0972bbd07554b159758bfbc6a682448e5 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 21 Mar 2022 15:13:30 +0800 Subject: [PATCH 4/4] process query msg --- source/dnode/mgmt/vnode/src/vmWorker.c | 36 +++++++++++++++++--------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 33945d438c..fe01b19d2d 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -16,6 +16,11 @@ #define _DEFAULT_SOURCE #include "vmInt.h" +static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { + SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code}; + dndSendRsp(pWrapper, &rsp); +} + static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { int32_t code = -1; tmsg_t msgType = pMsg->rpcMsg.msgType; @@ -44,8 +49,7 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { if (msgType & 1u) { if (code != 0 && terrno != 0) code = terrno; - SRpcMsg rsp = {.code = code, .handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle}; - dndSendRsp(pMgmt->pWrapper, &rsp); + vmSendRsp(pMgmt->pWrapper, pMsg, code); } dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); @@ -54,18 +58,27 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { } static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { - dTrace("msg:%p, will be processed in vnode query queue", pMsg); - vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg); + dTrace("msg:%p, will be processed in vnode-query queue", pMsg); + int32_t code = vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg); + if (code != 0) { + vmSendRsp(pVnode->pWrapper, pMsg, code); + } + + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); } static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { - dTrace("msg:%p, will be processed in vnode fetch queue", pMsg); - vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg); -} + dTrace("msg:%p, will be processed in vnode-fetch queue", pMsg); + int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg); + if (code != 0) { + vmSendRsp(pVnode->pWrapper, pMsg, code); + } -static void vmSendRsp(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t code) { - SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .ahandle = pMsg->rpcMsg.ahandle, .code = code}; - dndSendRsp(pWrapper, &rsp); + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); } static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { @@ -101,8 +114,7 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO free(pRsp); } else { if (code != 0 && terrno != 0) code = terrno; - SRpcMsg rpcRsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code}; - dndSendRsp(pVnode->pWrapper, &rpcRsp); + vmSendRsp(pVnode->pWrapper, pMsg, code); } }