From 17215ffe90847aa31240f37a7f727c7187f255cb Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 18 Mar 2022 19:43:09 +0800 Subject: [PATCH] shm --- source/dnode/mgmt/container/src/dndMsg.c | 1 - source/dnode/mgmt/dnode/src/dmWorker.c | 1 - source/dnode/mgmt/vnode/src/vmWorker.c | 88 +++++++++++------------- source/dnode/vnode/src/vnd/vnodeWrite.c | 10 +-- 4 files changed, 45 insertions(+), 55 deletions(-) diff --git a/source/dnode/mgmt/container/src/dndMsg.c b/source/dnode/mgmt/container/src/dndMsg.c index cf8fb9e6f5..21e9cc71a5 100644 --- a/source/dnode/mgmt/container/src/dndMsg.c +++ b/source/dnode/mgmt/container/src/dndMsg.c @@ -161,7 +161,6 @@ int32_t dndProcessNodeMsg(SDnode *pDnode, SNodeMsg *pMsg) { return dndProcessCreateNodeMsg(pDnode, BNODE, pMsg); case TDMT_DND_DROP_BNODE: return dndProcessDropNodeMsg(pDnode, BNODE, pMsg); - default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; return -1; diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index 6b2e1277d1..1e175e37ff 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -83,7 +83,6 @@ static void dmProcessQueue(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) { break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; - code = -1; dError("msg:%p, type:%s not processed in dnode queue", pRpc->handle, TMSG_INFO(pRpc->msgType)); } diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 161d7276a0..6d75c6c3e8 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -16,9 +16,15 @@ #define _DEFAULT_SOURCE #include "vmInt.h" -static void vmProcessQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessQueryMsg(pVnode->pImpl, pMsg); } +static void vmProcessQueryQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { + dTrace("msg:%p, will be processed in vnode query queue", pMsg); + vnodeProcessQueryMsg(pVnode->pImpl, &pMsg->rpcMsg); +} -static void vmProcessFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg) { vnodeProcessFetchMsg(pVnode->pImpl, pMsg); } +static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) { + dTrace("msg:%p, will be processed in vnode fetch queue", pMsg); + vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg); +} static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *)); @@ -60,43 +66,29 @@ static void vmProcessWriteQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numO } static void vmProcessApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { - SRpcMsg *pMsg = NULL; + SNodeMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); // todo SRpcMsg *pRsp = NULL; - (void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp); + (void)vnodeApplyWMsg(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); } } static void vmProcessSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t numOfMsgs) { - SRpcMsg *pMsg = NULL; + SNodeMsg *pMsg = NULL; for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); // todo SRpcMsg *pRsp = NULL; - (void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp); + (void)vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); } } -static int32_t vmWriteMsgToQueue(STaosQueue *pQueue, SNodeMsg *pMsg, bool sendRsp) { - int32_t code = taosWriteQitem(pQueue, pMsg); - - if (code != TSDB_CODE_SUCCESS && sendRsp) { - if (pMsg->rpcMsg.msgType & 1u) { - SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .code = code}; - rpcSendResponse(&rsp); - } - rpcFreeCont(pMsg->rpcMsg.pCont); - } - - return code; -} - static SVnodeObj *vmAcquireFromMsg(SVnodesMgmt *pMgmt, SNodeMsg *pNodeMsg) { SRpcMsg *pMsg = &pNodeMsg->rpcMsg; @@ -107,11 +99,6 @@ static SVnodeObj *vmAcquireFromMsg(SVnodesMgmt *pMgmt, SNodeMsg *pNodeMsg) { SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) { dError("vgId:%d, failed to acquire vnode while process req", pHead->vgId); - if (pMsg->msgType & 1u) { - SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; - rpcSendResponse(&rsp); - } - rpcFreeCont(pMsg->pCont); } return pVnode; @@ -119,34 +106,38 @@ static SVnodeObj *vmAcquireFromMsg(SVnodesMgmt *pMgmt, SNodeMsg *pNodeMsg) { int32_t vmProcessWriteMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); - if (pVnode != NULL) { - (void)vmWriteMsgToQueue(pVnode->pWriteQ, pMsg, true); - vmReleaseVnode(pMgmt, pVnode); - } + if (pVnode == NULL) return -1; + + int32_t code = taosWriteQitem(pVnode->pWriteQ, pMsg); + vmReleaseVnode(pMgmt, pVnode); + return code; } int32_t vmProcessSyncMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); - if (pVnode != NULL) { - (void)vmWriteMsgToQueue(pVnode->pSyncQ, pMsg, true); - vmReleaseVnode(pMgmt, pVnode); - } + if (pVnode == NULL) return -1; + + int32_t code = taosWriteQitem(pVnode->pSyncQ, pMsg); + vmReleaseVnode(pMgmt, pVnode); + return code; } int32_t vmProcessQueryMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); - if (pVnode != NULL) { - (void)vmWriteMsgToQueue(pVnode->pQueryQ, pMsg, true); - vmReleaseVnode(pMgmt, pVnode); - } + if (pVnode == NULL) return -1; + + int32_t code = taosWriteQitem(pVnode->pQueryQ, pMsg); + vmReleaseVnode(pMgmt, pVnode); + return code; } int32_t vmProcessFetchMsg(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { SVnodeObj *pVnode = vmAcquireFromMsg(pMgmt, pMsg); - if (pVnode != NULL) { - (void)vmWriteMsgToQueue(pVnode->pFetchQ, pMsg, true); - vmReleaseVnode(pMgmt, pVnode); - } + if (pVnode == NULL) return -1; + + int32_t code = taosWriteQitem(pVnode->pFetchQ, pMsg); + vmReleaseVnode(pMgmt, pVnode); + return code; } int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { @@ -161,7 +152,8 @@ int32_t vmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg != NULL) { - code = vmWriteMsgToQueue(pVnode->pQueryQ, pMsg, false); + pMsg->rpcMsg = *pRpc; + code = taosWriteQitem(pVnode->pQueryQ, pMsg); } vmReleaseVnode(pMgmt, pVnode); return code; @@ -179,7 +171,8 @@ int32_t vmPutMsgToApplyQueue(SMgmtWrapper *pWrapper, int32_t vgId, SRpcMsg *pRpc SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); if (pMsg != NULL) { - code = vmWriteMsgToQueue(pVnode->pApplyQ, pMsg, false); + pMsg->rpcMsg = *pRpc; + code = taosWriteQitem(pVnode->pApplyQ, pMsg); } vmReleaseVnode(pMgmt, pVnode); return code; @@ -217,7 +210,7 @@ void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { int32_t code = -1; tmsg_t msgType = pMsg->rpcMsg.msgType; - dTrace("msg:%p, will be processed", pMsg); + dTrace("msg:%p, will be processed in vnode mgmt queue", pMsg); switch (msgType) { case TDMT_DND_CREATE_VNODE: @@ -237,9 +230,7 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { break; default: terrno = TSDB_CODE_MSG_NOT_PROCESSED; - code = -1; - dError("RPC %p, dnode msg:%s not processed", pMsg->rpcMsg.handle, TMSG_INFO(msgType)); - break; + dError("msg:%p, not processed in mgmt queue", pMsg); } if (msgType & 1u) { @@ -248,10 +239,9 @@ static void vmProcessMgmtQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { rpcSendResponse(&rsp); } + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); rpcFreeCont(pMsg->rpcMsg.pCont); - pMsg->rpcMsg.pCont = NULL; taosFreeQitem(pMsg); - dTrace("msg:%p, is freed", pMsg); } int32_t vmStartWorker(SVnodesMgmt *pMgmt) { diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 218b53c2ab..2a75eebfd9 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -17,17 +17,19 @@ #include "vnd.h" int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { - SRpcMsg *pMsg; + SNodeMsg *pMsg; + SRpcMsg *pRpc; for (int i = 0; i < taosArrayGetSize(pMsgs); i++) { - pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i); + pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i); + pRpc = &pMsg->rpcMsg; // set request version - void *pBuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + void *pBuf = POINTER_SHIFT(pRpc->pCont, sizeof(SMsgHead)); int64_t ver = pVnode->state.processed++; taosEncodeFixedI64(&pBuf, ver); - if (walWrite(pVnode->pWal, ver, pMsg->msgType, pMsg->pCont, pMsg->contLen) < 0) { + if (walWrite(pVnode->pWal, ver, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) { // TODO: handle error /*ASSERT(false);*/ vError("vnode:%d write wal error since %s", pVnode->vgId, terrstr());