From 21a9367d679cb01c8e06f2811cc812c15c01921d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 14 May 2022 23:26:28 +0800 Subject: [PATCH] refactor: transport --- .../mgmt/node_mgmt/src/{dmRun.c => dmNodes.c} | 0 source/dnode/mgmt/node_mgmt/src/dmProc.c | 3 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 252 ++++++++---------- source/dnode/mnode/impl/test/CMakeLists.txt | 2 +- 4 files changed, 115 insertions(+), 142 deletions(-) rename source/dnode/mgmt/node_mgmt/src/{dmRun.c => dmNodes.c} (100%) diff --git a/source/dnode/mgmt/node_mgmt/src/dmRun.c b/source/dnode/mgmt/node_mgmt/src/dmNodes.c similarity index 100% rename from source/dnode/mgmt/node_mgmt/src/dmRun.c rename to source/dnode/mgmt/node_mgmt/src/dmNodes.c diff --git a/source/dnode/mgmt/node_mgmt/src/dmProc.c b/source/dnode/mgmt/node_mgmt/src/dmProc.c index 3b55fb8b07..5ce7bb5b14 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmProc.c +++ b/source/dnode/mgmt/node_mgmt/src/dmProc.c @@ -398,8 +398,7 @@ static void *dmConsumParentQueue(void *param) { rpcFreeCont(pBody); } else if (ftype == PROC_FUNC_RELEASE) { pRsp = pHead; - dTrace("node:%s, release msg:%p from parent queue, code:0x%04x handle:%p", proc->name, pRsp, code, - pRsp->handle); + dTrace("node:%s, release msg:%p from parent queue, code:0x%04x handle:%p", proc->name, pRsp, code, pRsp->handle); dmRemoveProcRpcHandle(proc, pRsp->handle); rpcReleaseHandle(pRsp->handle, (int8_t)pRsp->code); rpcFreeCont(pBody); diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index e69d1d2417..6d5bedfff5 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -41,15 +41,6 @@ static void dmSetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { taosWUnLockLatch(&pData->latch); } -static inline NodeMsgFp dmGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { - NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; - if (msgFp == NULL) { - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - } - - return msgFp; -} - static inline int32_t dmBuildNodeMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) { SRpcConnInfo connInfo = {0}; if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) { @@ -62,66 +53,124 @@ static inline int32_t dmBuildNodeMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) { pMsg->clientIp = connInfo.clientIp; pMsg->clientPort = connInfo.clientPort; memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg)); - if ((pRpc->msgType & 1u)) { - assert(pRpc->refId != 0); - } - return 0; } int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { - SRpcMsg *pRpc = &pMsg->rpcMsg; - - if (InParentProc(pWrapper->proc.ptype)) { - dTrace("msg:%p, created and put into child queue, type:%s handle:%p user:%s code:0x%04x contLen:%d", pMsg, - TMSG_INFO(pRpc->msgType), pRpc->handle, pMsg->user, pRpc->code & 0XFFFF, pRpc->contLen); - return dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, - ((pRpc->msgType & 1U) && (pRpc->code == 0)) ? pRpc->handle : NULL, pRpc->refId, - PROC_FUNC_REQ); - } else { - dTrace("msg:%p, created, type:%s handle:%p user:%s", pMsg, TMSG_INFO(pRpc->msgType), pRpc->handle, pMsg->user); - NodeMsgFp msgFp = dmGetMsgFp(pWrapper, &pMsg->rpcMsg); - if (msgFp == NULL) return -1; - return (*msgFp)(pWrapper->pMgmt, pMsg); + NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->rpcMsg.msgType)]; + if (msgFp == NULL) { + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + return -1; } + + dTrace("msg:%p, will be processed, handle:%p", pMsg, pMsg->rpcMsg.handle); + return (*msgFp)(pWrapper->pMgmt, pMsg); } -static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { - int32_t code = -1; - SNodeMsg *pMsg = NULL; - uint16_t msgType = pRpc->msgType; - bool needRelease = false; - bool isReq = msgType & 1U; +static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { + SDnodeTrans *pTrans = &pDnode->trans; + int32_t code = -1; + SNodeMsg *pMsg = NULL; + tmsg_t msgType = pRpc->msgType; + bool isReq = msgType & 1u; + bool needRelease = false; + SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)]; + SMgmtWrapper *pWrapper = NULL; - if (dmMarkWrapper(pWrapper) != 0) goto _OVER; - needRelease = true; + if (msgType == TDMT_DND_NET_TEST) { + dmProcessNetTestReq(pDnode, pRpc); + code = 0; + goto _OVER; + } else if (msgType == TDMT_MND_SYSTABLE_RETRIEVE_RSP || msgType == TDMT_VND_FETCH_RSP) { + code = qWorkerProcessFetchRsp(NULL, NULL, pRpc); + pRpc->pCont = NULL; // will be freed in qworker + code = 0; + goto _OVER; + } else { + } - if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM)) == NULL) goto _OVER; - if (dmBuildNodeMsg(pMsg, pRpc) != 0) goto _OVER; + if (pDnode->status != DND_STAT_RUNNING) { + if (msgType == TDMT_DND_SERVER_STATUS) { + dmProcessServerStartupStatus(pDnode, pRpc); + code = 0; + } else { + terrno = TSDB_CODE_APP_NOT_READY; + } + goto _OVER; + } - code = dmProcessNodeMsg(pWrapper, pMsg); + if (isReq && pRpc->pCont == NULL) { + terrno = TSDB_CODE_INVALID_MSG_LEN; + goto _OVER; + } + + if (pHandle->defaultNtype == NODE_END) { + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + goto _OVER; + } else { + pWrapper = &pDnode->wrappers[pHandle->defaultNtype]; + if (pHandle->needCheckVgId) { + SMsgHead *pHead = pRpc->pCont; + int32_t vgId = ntohl(pHead->vgId); + if (vgId == QNODE_HANDLE) { + pWrapper = &pDnode->wrappers[QNODE]; + } else if (vgId == MNODE_HANDLE) { + pWrapper = &pDnode->wrappers[MNODE]; + } else { + } + } + } + + if (dmMarkWrapper(pWrapper) != 0) { + goto _OVER; + } else { + needRelease = true; + } + + dTrace("msg:%s is received, handle:%p app:%p", TMSG_INFO(msgType), pRpc->handle, pRpc->ahandle); + pMsg = taosAllocateQitem(sizeof(SNodeMsg), RPC_QITEM); + if (pMsg == NULL) { + goto _OVER; + } + + if (dmBuildNodeMsg(pMsg, pRpc) != 0) { + goto _OVER; + } + + if (InParentProc(pWrapper->proc.ptype)) { + dTrace("msg:%p, put into child queue, handle:%p", pMsg, pRpc->handle); + code = dmPutToProcCQueue(&pWrapper->proc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, + (isReq && (pRpc->code == 0)) ? pRpc->handle : NULL, pRpc->refId, PROC_FUNC_REQ); + } else { + code = dmProcessNodeMsg(pWrapper, pMsg); + } _OVER: if (code == 0) { - if (InParentProc(pWrapper->proc.ptype)) { + if (pWrapper != NULL && InParentProc(pWrapper->proc.ptype)) { dTrace("msg:%p, freed in parent process", pMsg); taosFreeQitem(pMsg); rpcFreeCont(pRpc->pCont); } } else { - dError("msg:%p, type:%s handle:%p failed to process since 0x%04x:%s", pMsg, TMSG_INFO(msgType), pRpc->handle, - code & 0XFFFF, terrstr()); + dError("msg:%p, failed to process since %s", pMsg, terrstr()); + if (terrno != 0) code = terrno; + if (isReq) { - if (terrno != 0) code = terrno; if (code == TSDB_CODE_NODE_NOT_DEPLOYED || code == TSDB_CODE_NODE_OFFLINE) { if (msgType > TDMT_MND_MSG && msgType < TDMT_VND_MSG) { code = TSDB_CODE_NODE_REDIRECT; } } - - SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code, .refId = pRpc->refId}; - tmsgSendRsp(&rsp); + SRpcMsg rspMsg = { + .handle = pRpc->handle, + .code = code, + .ahandle = pRpc->ahandle, + .refId = pRpc->refId, + }; + tmsgSendRsp(&rspMsg); } + dTrace("msg:%p, is freed", pMsg); taosFreeQitem(pMsg); rpcFreeCont(pRpc->pCont); @@ -132,83 +181,6 @@ _OVER: } } -static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SDnodeTrans *pTrans = &pDnode->trans; - tmsg_t msgType = pMsg->msgType; - bool isReq = msgType & 1u; - SMsgHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(msgType)]; - SMgmtWrapper *pWrapper = NULL; - - switch (msgType) { - case TDMT_DND_SERVER_STATUS: - if (pDnode->status != DND_STAT_RUNNING) { - dTrace("server status req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle); - dmProcessServerStartupStatus(pDnode, pMsg); - return; - } else { - break; - } - case TDMT_DND_NET_TEST: - dTrace("net test req will be processed, handle:%p, app:%p", pMsg->handle, pMsg->ahandle); - dmProcessNetTestReq(pDnode, pMsg); - return; - case TDMT_MND_SYSTABLE_RETRIEVE_RSP: - case TDMT_VND_FETCH_RSP: - dTrace("retrieve rsp is received"); - qWorkerProcessFetchRsp(NULL, NULL, pMsg); - pMsg->pCont = NULL; // already freed in qworker - return; - } - - if (pDnode->status != DND_STAT_RUNNING) { - dError("msg:%s ignored since dnode not running, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle); - if (isReq) { - SRpcMsg rspMsg = { - .handle = pMsg->handle, .code = TSDB_CODE_APP_NOT_READY, .ahandle = pMsg->ahandle, .refId = pMsg->refId}; - rpcSendResponse(&rspMsg); - } - rpcFreeCont(pMsg->pCont); - return; - } - - if (isReq && pMsg->pCont == NULL) { - dError("req:%s not processed since its empty, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle); - SRpcMsg rspMsg = { - .handle = pMsg->handle, .code = TSDB_CODE_INVALID_MSG_LEN, .ahandle = pMsg->ahandle, .refId = pMsg->refId}; - rpcSendResponse(&rspMsg); - return; - } - - if (pHandle->defaultNtype == NODE_END) { - dError("msg:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pMsg->handle, pMsg->ahandle); - if (isReq) { - SRpcMsg rspMsg = { - .handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED, .ahandle = pMsg->ahandle, .refId = pMsg->refId}; - rpcSendResponse(&rspMsg); - } - rpcFreeCont(pMsg->pCont); - return; - } - - pWrapper = &pDnode->wrappers[pHandle->defaultNtype]; - if (pHandle->needCheckVgId) { - SMsgHead *pHead = pMsg->pCont; - int32_t vgId = ntohl(pHead->vgId); - if (vgId == QNODE_HANDLE) { - pWrapper = &pDnode->wrappers[QNODE]; - } else if (vgId == MNODE_HANDLE) { - pWrapper = &pDnode->wrappers[MNODE]; - } else { - } - } - - dTrace("msg:%s will be processed by %s, app:%p", TMSG_INFO(msgType), pWrapper->name, pMsg->ahandle); - if (isReq) { - assert(pMsg->refId != 0); - } - dmProcessRpcMsg(pWrapper, pMsg, pEpSet); -} - int32_t dmInitMsgHandle(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; @@ -248,17 +220,19 @@ static void dmSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) { epSet.eps[i].port = htons(epSet.eps[i].port); } - SRpcMsg resp; + SMEpSet msg = {.epSet = epSet}; int32_t len = tSerializeSMEpSet(NULL, 0, &msg); - resp.pCont = rpcMallocCont(len); - resp.contLen = len; - tSerializeSMEpSet(resp.pCont, len, &msg); - resp.code = TSDB_CODE_RPC_REDIRECT; - resp.handle = pReq->handle; - resp.refId = pReq->refId; - rpcSendResponse(&resp); + SRpcMsg rsp = { + .code = TSDB_CODE_RPC_REDIRECT, + .handle = pReq->handle, + .refId = pReq->refId, + .contLen = len, + }; + rsp.pCont = rpcMallocCont(len); + tSerializeSMEpSet(rsp.pCont, len, &msg); + rpcSendResponse(&rsp); } static inline void dmSendRpcRsp(SDnode *pDnode, const SRpcMsg *pRsp) { @@ -309,17 +283,17 @@ static inline void dmSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { static inline void dmSendRedirectRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp, const SEpSet *pNewEpSet) { if (!InChildProc(pWrapper->proc.ptype)) { - SRpcMsg resp = {0}; + SRpcMsg rsp = {0}; SMEpSet msg = {.epSet = *pNewEpSet}; int32_t len = tSerializeSMEpSet(NULL, 0, &msg); - resp.pCont = rpcMallocCont(len); - resp.contLen = len; - tSerializeSMEpSet(resp.pCont, len, &msg); + rsp.pCont = rpcMallocCont(len); + rsp.contLen = len; + tSerializeSMEpSet(rsp.pCont, len, &msg); - resp.code = TSDB_CODE_RPC_REDIRECT; - resp.handle = pRsp->handle; - resp.refId = pRsp->refId; - rpcSendResponse(&resp); + rsp.code = TSDB_CODE_RPC_REDIRECT; + rsp.handle = pRsp->handle; + rsp.refId = pRsp->refId; + rpcSendResponse(&rsp); } else { dmPutToProcPQueue(&pWrapper->proc, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP); } @@ -356,7 +330,7 @@ int32_t dmInitClient(SDnode *pDnode) { SRpcInit rpcInit = {0}; rpcInit.label = "DND"; rpcInit.numOfThreads = 1; - rpcInit.cfp = (RpcCfp)dmProcessMsg; + rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.sessions = 1024; rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.idleTime = tsShellActivityTimer * 1000; @@ -459,7 +433,7 @@ int32_t dmInitServer(SDnode *pDnode) { rpcInit.localPort = pDnode->data.serverPort; rpcInit.label = "DND"; rpcInit.numOfThreads = tsNumOfRpcThreads; - rpcInit.cfp = (RpcCfp)dmProcessMsg; + rpcInit.cfp = (RpcCfp)dmProcessRpcMsg; rpcInit.sessions = tsMaxShellConns; rpcInit.connType = TAOS_CONN_SERVER; rpcInit.idleTime = tsShellActivityTimer * 1000; diff --git a/source/dnode/mnode/impl/test/CMakeLists.txt b/source/dnode/mnode/impl/test/CMakeLists.txt index b6e3c8f3b4..3b1ca0999c 100644 --- a/source/dnode/mnode/impl/test/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/CMakeLists.txt @@ -3,7 +3,7 @@ enable_testing() add_subdirectory(acct) add_subdirectory(bnode) add_subdirectory(db) -add_subdirectory(dnode) +#add_subdirectory(dnode) add_subdirectory(func) #add_subdirectory(mnode) add_subdirectory(profile)