diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 24ded1f90a..a01355c099 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -116,15 +116,6 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr */ int32_t mndRetriveAuth(SMnode *pMnode, char *user, char *spi, char *encrypt, char *secret, char *ckey); -/** - * @brief Initialize mnode msg. - * - * @param pMnode The mnode object. - * @param pMsg The request rpc msg. - * @return int32_t The created mnode msg. - */ -int32_t mndBuildMsg(SMnodeMsg *pMnodeMsg, SRpcMsg *pRpcMsg); - /** * @brief Cleanup mnode msg. * diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index 445a8bbf31..d31c2d7312 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -82,6 +82,7 @@ typedef struct { SProcObj *pProcess; bool singleProc; bool isChild; + bool testFlag; } SMnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c index 2ee12e66ca..788a6d5559 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c @@ -117,8 +117,9 @@ void mmRelease(SDnode *pDnode, SMnode *pMnode) { int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->singleProc = true; + pMgmt->singleProc = false; pMgmt->isChild = false; + pMgmt->testFlag = true; int32_t code = mmOpenImp(pDnode, pOption); diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c index 1990dab822..706c451f67 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c @@ -20,11 +20,11 @@ #include "dndTransport.h" #include "dndWorker.h" -static int32_t mmProcessWriteMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg); -static int32_t mmProcessSyncMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg); -static int32_t mmProcessReadMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg); -static int32_t mmPutMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMnodeMsg); -static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg); +static int32_t mmProcessWriteMsg(SDnode *pDnode, SMnodeMsg *pMsg); +static int32_t mmProcessSyncMsg(SDnode *pDnode, SMnodeMsg *pMsg); +static int32_t mmProcessReadMsg(SDnode *pDnode, SMnodeMsg *pMsg); +static int32_t mmPutMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SMnodeMsg *pMsg); +static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg); static void mmConsumeQueue(SDnode *pDnode, SMnodeMsg *pMsg); int32_t mmStartWorker(SDnode *pDnode) { @@ -139,54 +139,76 @@ void mmInitMsgFp(SMnodeMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = mmProcessWriteMsg; } -void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) { +static int32_t mndBuildMsg(SMnodeMsg *pMsg, SRpcMsg *pRpc) { + SRpcConnInfo connInfo = {0}; + if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) { + terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; + dError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpc->ahandle, pRpc->handle); + return -1; + } + memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN); + + pMsg->rpcMsg = *pRpc; + pMsg->createdTime = taosGetTimestampSec(); + + char *pCont = (char *)pMsg + sizeof(SMnodeMsg); + memcpy(pCont, pRpc->pCont, pRpc->contLen); + pMsg->rpcMsg = *pRpc; + pMsg->rpcMsg.pCont = pCont; + pMsg->createdTime = taosGetTimestampSec(); + + dTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMsg, pRpc->ahandle, pRpc->handle, pMsg->user); + return 0; +} + +void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; int32_t code = -1; - SMnodeMsg *pMnodeMsg = NULL; + SMnodeMsg *pMsg = NULL; - MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpcMsg->msgType)]; + MndMsgFp msgFp = pMgmt->msgFp[TMSG_INDEX(pRpc->msgType)]; if (msgFp == NULL) { terrno = TSDB_CODE_MSG_NOT_PROCESSED; goto _OVER; } - int32_t contLen = sizeof(SMnodeMsg) + pRpcMsg->contLen; - pMnodeMsg = taosAllocateQitem(contLen); - if (pMnodeMsg == NULL) { + int32_t contLen = sizeof(SMnodeMsg) + pRpc->contLen; + pMsg = taosAllocateQitem(contLen); + if (pMsg == NULL) { goto _OVER; } - if (mndBuildMsg(pMnodeMsg, pRpcMsg) != 0) { + if (mndBuildMsg(pMsg, pRpc) != 0) { goto _OVER; } if (pMgmt->singleProc) { - code = (*msgFp)(pDnode, pMnodeMsg); + code = (*msgFp)(pDnode, pMsg); } else { - code = taosProcPushChild(pMgmt->pProcess, pMnodeMsg, contLen); + code = taosProcPushChild(pMgmt->pProcess, pMsg, contLen); } _OVER: if (code == 0) { if (!pMgmt->singleProc) { - taosFreeQitem(pMnodeMsg); + taosFreeQitem(pMsg); } } else { - bool isReq = (pRpcMsg->msgType & 1U); + bool isReq = (pRpc->msgType & 1U); if (isReq) { if (terrno == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || terrno == TSDB_CODE_APP_NOT_READY) { - dndSendRedirectRsp(pDnode, pRpcMsg); + dndSendRedirectRsp(pDnode, pRpc); } else { - SRpcMsg rsp = {.handle = pRpcMsg->handle, .ahandle = pRpcMsg->ahandle, .code = terrno}; + SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; rpcSendResponse(&rsp); } } - taosFreeQitem(pMnodeMsg); + taosFreeQitem(pMsg); } - rpcFreeCont(pRpcMsg->pCont); + rpcFreeCont(pRpc->pCont); } int32_t mmProcessWriteMsg(SDnode *pDnode, SMnodeMsg *pMnodeMsg) { @@ -261,5 +283,5 @@ static void mmConsumeQueue(SDnode *pDnode, SMnodeMsg *pMsg) { mndSendRsp(pMsg, terrno); } - // mndCleanupMsg(pMsg); + taosFreeQitem(pMsg); } diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index c8da62ae16..92c514f359 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -390,28 +390,6 @@ void mndDestroy(const char *path) { mDebug("mnode is destroyed"); } -int32_t mndBuildMsg(SMnodeMsg *pMnodeMsg, SRpcMsg *pRpcMsg) { - if (pRpcMsg->msgType != TDMT_MND_TRANS_TIMER && pRpcMsg->msgType != TDMT_MND_MQ_TIMER && - pRpcMsg->msgType != TDMT_MND_MQ_DO_REBALANCE && pRpcMsg->msgType != TDMT_MND_TELEM_TIMER) { - SRpcConnInfo connInfo = {0}; - if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) { - terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; - mError("failed to create msg since %s, app:%p RPC:%p", terrstr(), pRpcMsg->ahandle, pRpcMsg->handle); - return -1; - } - memcpy(pMnodeMsg->user, connInfo.user, TSDB_USER_LEN); - } - - pMnodeMsg->rpcMsg = *pRpcMsg; - pMnodeMsg->createdTime = taosGetTimestampSec(); - pMnodeMsg->pCont = (char*)pMnodeMsg + sizeof(pMnodeMsg); - - if (pRpcMsg != NULL) { - mTrace("msg:%p, is created, app:%p RPC:%p user:%s", pMnodeMsg, pRpcMsg->ahandle, pRpcMsg->handle, pMnodeMsg->user); - } - return 0; -} - void mndSendRsp(SMnodeMsg *pMsg, int32_t code) { SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code}; rpcSendResponse(&rpcRsp);