From 143a3113609366f5e1522feff4f5df98489a2e60 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 15 Mar 2022 11:59:22 +0800 Subject: [PATCH] shm --- include/libs/transport/trpc.h | 1 - source/dnode/mgmt/container/inc/dndInt.h | 8 +-- source/dnode/mgmt/container/src/dndInt.c | 6 +- source/dnode/mgmt/container/src/dndNode.c | 61 ++++++++++++++++++- .../dnode/mgmt/container/src/dndTransport.c | 16 ++--- source/dnode/mgmt/dnode/inc/dmWorker.h | 2 +- source/dnode/mgmt/dnode/src/dmWorker.c | 11 +--- source/dnode/mgmt/mnode/inc/mmWorker.h | 6 +- source/dnode/mgmt/mnode/src/mmWorker.c | 6 +- source/dnode/mgmt/vnode/inc/vmWorker.h | 8 +-- source/dnode/mgmt/vnode/src/vmWorker.c | 8 +-- 11 files changed, 88 insertions(+), 45 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 1f9cc06946..7f02eafe3f 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -49,7 +49,6 @@ typedef struct SRpcMsg { typedef struct { char user[TSDB_USER_LEN]; SRpcMsg rpcMsg; - SEpSet rpcEpSet; int32_t rspLen; void *pRsp; } SNodeMsg; diff --git a/source/dnode/mgmt/container/inc/dndInt.h b/source/dnode/mgmt/container/inc/dndInt.h index 8c85d67d37..90f9e86c25 100644 --- a/source/dnode/mgmt/container/inc/dndInt.h +++ b/source/dnode/mgmt/container/inc/dndInt.h @@ -72,15 +72,13 @@ typedef struct SQnodeMgmt SQnodeMgmt; typedef struct SSnodeMgmt SSnodeMgmt; typedef struct SBnodeMgmt SBnodeMgmt; -typedef void (*RpcMsgFp)(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEps); -typedef void (*NodeMsgFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +typedef int32_t (*NodeMsgFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); typedef int32_t (*OpenNodeFp)(SMgmtWrapper *pWrapper); typedef void (*CloseNodeFp)(SMgmtWrapper *pWrapper); typedef bool (*RequireNodeFp)(SMgmtWrapper *pWrapper); typedef struct SMsgHandle { - RpcMsgFp rpcMsgFp; - NodeMsgFp nodeMsgFp; + NodeMsgFp msgFp; SMgmtWrapper *pWrapper; } SMsgHandle; @@ -98,7 +96,7 @@ typedef struct SMgmtWrapper { SProcObj *pProc; void *pMgmt; SDnode *pDnode; - SMsgHandle msgHandles[TDMT_MAX]; + NodeMsgFp msgFps[TDMT_MAX]; SMgmtFp fp; } SMgmtWrapper; diff --git a/source/dnode/mgmt/container/src/dndInt.c b/source/dnode/mgmt/container/src/dndInt.c index ca3f1e14ea..17757a75d5 100644 --- a/source/dnode/mgmt/container/src/dndInt.c +++ b/source/dnode/mgmt/container/src/dndInt.c @@ -65,11 +65,7 @@ void dndCleanup() { SMgmtWrapper *dndGetWrapper(SDnode *pDnode, ENodeType nodeType) { return &pDnode->wrappers[nodeType]; } void dndSetMsgHandle(SMgmtWrapper *pWrapper, int32_t msgType, NodeMsgFp nodeMsgFp) { - SMsgHandle *pHandle = &pWrapper->msgHandles[TMSG_INDEX(msgType)]; - - pHandle->pWrapper = pWrapper; - pHandle->nodeMsgFp = nodeMsgFp; - pHandle->rpcMsgFp = dndProcessRpcMsg; + pWrapper->msgFps[TMSG_INDEX(msgType)] = nodeMsgFp; } EDndStatus dndGetStatus(SDnode *pDnode) { return pDnode->status; } diff --git a/source/dnode/mgmt/container/src/dndNode.c b/source/dnode/mgmt/container/src/dndNode.c index ea198bd42d..6b41f4b5d2 100644 --- a/source/dnode/mgmt/container/src/dndNode.c +++ b/source/dnode/mgmt/container/src/dndNode.c @@ -257,8 +257,65 @@ void dndeHandleEvent(SDnode *pDnode, EDndEvent event) { pDnode->event = event; } -void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, SEpSet *pEpSet) { - if (pEpSet && pEpSet->numOfEps > 0 && pMsg->msgType == TDMT_MND_STATUS_RSP) { +static int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc, SEpSet *pEpSet) { + SRpcConnInfo connInfo = {0}; + if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) { + terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; + dError("failed to build msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle); + return -1; + } + + memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); + pMsg->rpcMsg = *pRpc; + + return 0; +} + +static void dndSendRpcRsp(SDnode *pDnode, SRpcMsg *pRpc) { + if (pRpc->code == TSDB_CODE_APP_NOT_READY) { + dmSendRedirectRsp(pDnode, pRpc); + } else { + rpcSendResponse(pRpc); + } +} + +void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { + if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) { dmUpdateMnodeEpSet(pWrapper->pDnode, pEpSet); } + + int32_t code = -1; + SNodeMsg *pMsg = NULL; + + NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; + if (msgFp == NULL) { + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + goto _OVER; + } + + pMsg = taosAllocateQitem(sizeof(SNodeMsg)); + if (pMsg == NULL) { + goto _OVER; + } + + if (dndBuildMsg(pMsg, pRpc, pEpSet) != 0) { + goto _OVER; + } + + dTrace("msg:%p, is created, app:%p RPC:%p user:%s, processd by %s", pMsg, pRpc->ahandle, pRpc->handle, pMsg->user, + pWrapper->name); + code = (*msgFp)(pWrapper, pMsg); + +_OVER: + + if (code != 0) { + bool isReq = (pRpc->msgType & 1U); + if (isReq) { + SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; + dndSendRpcRsp(pWrapper->pDnode, &rsp); + } + dTrace("msg:%p, is freed", pMsg); + taosFreeQitem(pMsg); + rpcFreeCont(pRpc->pCont); + } } \ No newline at end of file diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index 29f1e42ffc..81b6818fbc 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -36,10 +36,10 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { } SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; - if (pHandle->rpcMsgFp != NULL) { + if (pHandle->msgFp != NULL) { dTrace("RPC %p, rsp:%s will be processed by %s, code:0x%x app:%p", pRsp->handle, TMSG_INFO(msgType), pHandle->pWrapper->name, pRsp->code & 0XFFFF, pRsp->ahandle); - (*pHandle->rpcMsgFp)(pHandle->pWrapper, pRsp, pEpSet); + dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet); } else { dError("RPC %p, rsp:%s not processed, app:%p", pRsp->handle, TMSG_INFO(msgType), pRsp->ahandle); rpcFreeCont(pRsp->pCont); @@ -118,10 +118,10 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) { } SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; - if (pHandle->rpcMsgFp != NULL) { + if (pHandle->msgFp != NULL) { dTrace("RPC %p, req:%s will be processed by %s, app:%p", pReq->handle, TMSG_INFO(msgType), pHandle->pWrapper->name, pReq->ahandle); - (*pHandle->rpcMsgFp)(pHandle->pWrapper, pReq, pEpSet); + dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet); } else { dError("RPC %p, req:%s not processed since no handle, app:%p", pReq->handle, TMSG_INFO(msgType), pReq->ahandle); SRpcMsg rspMsg = {.handle = pReq->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pReq->ahandle}; @@ -253,17 +253,17 @@ int32_t dndInitMsgHandle(SDnode *pDnode) { SMgmtWrapper *pWrapper = &pDnode->wrappers[nodeType]; for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) { - SMsgHandle msgHandle = pWrapper->msgHandles[msgIndex]; - if (msgHandle.rpcMsgFp == NULL) continue; + NodeMsgFp msgFp = pWrapper->msgFps[msgIndex]; + if (msgFp == NULL) continue; SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex]; - if (pHandle->rpcMsgFp != NULL) { + if (pHandle->msgFp != NULL) { dError("msg:%s, has multiple process nodes, prev node:%s, curr node:%s", tMsgInfo[msgIndex], pHandle->pWrapper->name, pWrapper->name); return -1; } else { dDebug("msg:%s, will be processed by node:%s", tMsgInfo[msgIndex], pWrapper->name); - *pHandle = msgHandle; + pHandle->msgFp = msgFp; } } } diff --git a/source/dnode/mgmt/dnode/inc/dmWorker.h b/source/dnode/mgmt/dnode/inc/dmWorker.h index 60c5458e9c..992dca1ea7 100644 --- a/source/dnode/mgmt/dnode/inc/dmWorker.h +++ b/source/dnode/mgmt/dnode/inc/dmWorker.h @@ -24,7 +24,7 @@ extern "C" { int32_t dmStartWorker(); void dmStopWorker(); -void dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index 97da028ba0..80acec2262 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -161,7 +161,7 @@ void dmStopWorker(SDnodeMgmt *pMgmt) { } } -void dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { +int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { SDnodeMgmt *pMgmt = pWrapper->pMgmt; SDnodeWorker *pWorker = &pMgmt->mgmtWorker; @@ -169,12 +169,5 @@ void dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { pWorker = &pMgmt->statusWorker; } - if (dndWriteMsgToWorker(pWorker, pMsg, sizeof(SNodeMsg)) != 0) { - if (pMsg->rpcMsg.msgType & 1u) { - SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle, .code = TSDB_CODE_OUT_OF_MEMORY}; - rpcSendResponse(&rsp); - } - rpcFreeCont(pMsg->rpcMsg.pCont); - taosFreeQitem(pMsg); - } + return dndWriteMsgToWorker(pWorker, pMsg, sizeof(SNodeMsg)); } \ No newline at end of file diff --git a/source/dnode/mgmt/mnode/inc/mmWorker.h b/source/dnode/mgmt/mnode/inc/mmWorker.h index 553786f444..a1dbd5c267 100644 --- a/source/dnode/mgmt/mnode/inc/mmWorker.h +++ b/source/dnode/mgmt/mnode/inc/mmWorker.h @@ -31,9 +31,9 @@ int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); -void mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -void mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -void mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t mmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index 42b0dc5292..8b8b472f1b 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -268,6 +268,6 @@ static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) { #endif -void mmProcessWriteMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {} -void mmProcessSyncMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {} -void mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {} \ No newline at end of file +int32_t mmProcessWriteMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;} +int32_t mmProcessSyncMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;} +int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;} \ No newline at end of file diff --git a/source/dnode/mgmt/vnode/inc/vmWorker.h b/source/dnode/mgmt/vnode/inc/vmWorker.h index b13d2412c0..e64ecd8dbf 100644 --- a/source/dnode/mgmt/vnode/inc/vmWorker.h +++ b/source/dnode/mgmt/vnode/inc/vmWorker.h @@ -31,10 +31,10 @@ int32_t vmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); void vmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); void vmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); -void vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -void vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -void vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -void vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t vmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index a7111e743c..ae11a92004 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "vmWorker.h" -void vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){} -void vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){} -void vmProcessQueryMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg){} -void vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){} \ No newline at end of file +int32_t vmProcessWriteMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){return 0;} +int32_t vmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){return 0;} +int32_t vmProcessQueryMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg){return 0;} +int32_t vmProcessFetchMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg){return 0;} \ No newline at end of file