From ad1609fe7f535fd4b6f01751f03360eb21e3e774 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 11 Mar 2022 18:18:45 +0800 Subject: [PATCH] shm --- source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h | 2 +- source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c | 4 +- .../dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c | 66 ++++++++++++------- source/util/src/tprocess.c | 4 +- 4 files changed, 48 insertions(+), 28 deletions(-) diff --git a/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h b/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h index 57bb95880d..d882a3bbc9 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h +++ b/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h @@ -53,7 +53,7 @@ void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); -void mmConsumeParentQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); +void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c index e03384f9ad..8a072ce63f 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c @@ -130,8 +130,8 @@ int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) { .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, .parentQueueSize = 1024 * 1024, .parentConsumeFp = (ProcConsumeFp)mmConsumeParentQueue, - .parentdMallocHeadFp = (ProcMallocFp)taosAllocateQitem, - .parentFreeHeadFp = (ProcFreeFp)taosFreeQitem, + .parentdMallocHeadFp = (ProcMallocFp)malloc, + .parentFreeHeadFp = (ProcFreeFp)free, .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, .testFlag = true, diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c index 307235b728..22ff2088a5 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c @@ -139,6 +139,14 @@ void mmInitMsgFp(SMndMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_STB_RSP)] = mmProcessWriteMsg; } +static void mmSendRpcRsp(SDnode *pDnode, SRpcMsg *pRpc) { + if (pRpc->code == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || pRpc->code == TSDB_CODE_APP_NOT_READY) { + dndSendRedirectRsp(pDnode, pRpc); + } else { + rpcSendResponse(pRpc); + } +} + static int32_t mmBuildMsg(SMndMsg *pMsg, SRpcMsg *pRpc) { SRpcConnInfo connInfo = {0}; if ((pRpc->msgType & 1U) && rpcGetConnInfo(pRpc->handle, &connInfo) != 0) { @@ -183,15 +191,16 @@ void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { _OVER: - if (code != 0) { + if (code == 0) { + if (!pMgmt->singleProc) { + taosFreeQitem(pMsg); + rpcFreeCont(pRpc->pCont); + } + } else { bool isReq = (pRpc->msgType & 1U); if (isReq) { - if (terrno == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || terrno == TSDB_CODE_APP_NOT_READY) { - dndSendRedirectRsp(pDnode, pRpc); - } else { - SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; - rpcSendResponse(&rsp); - } + SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; + mmSendRpcRsp(pDnode, &rsp); } taosFreeQitem(pMsg); rpcFreeCont(pRpc->pCont); @@ -245,6 +254,22 @@ static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMs return code; } +void mmPutRpcRspToWorker(SDnode *pDnode, SRpcMsg *pRpc) { + SMndMgmt *pMgmt = &pDnode->mmgmt; + int32_t code = -1; + + if (pMgmt->singleProc) { + mmSendRpcRsp(pDnode, pRpc); + } else { + do { + code = taosProcPutToParentQueue(pMgmt->pProcess, pRpc, sizeof(SRpcMsg), pRpc->pCont, pRpc->contLen); + if (code != 0) { + taosMsleep(10); + } + } while (code != 0); + } +} + void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { SMndMgmt *pMgmt = &pDnode->mmgmt; @@ -257,25 +282,23 @@ void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pC if (code != 0) { bool isReq = (pRpc->msgType & 1U); if (isReq) { - if (terrno == TSDB_CODE_DND_MNODE_NOT_DEPLOYED || terrno == TSDB_CODE_APP_NOT_READY) { - dndSendRedirectRsp(pDnode, pRpc); - } else { - SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; - rpcSendResponse(&rsp); - } + SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; + mmPutRpcRspToWorker(pDnode, &rsp); } taosFreeQitem(pMsg); rpcFreeCont(pCont); } } -void mmConsumeParentQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) {} +void mmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { + pMsg->pCont = pCont; + mmSendRpcRsp(pDnode, pMsg); + free(pMsg); +} static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) { SMnode *pMnode = mmAcquire(pDnode); SRpcMsg *pRpc = &pMsg->rpcMsg; - tmsg_t msgType = pMsg->rpcMsg.msgType; - void *ahandle = pMsg->rpcMsg.ahandle; bool isReq = (pRpc->msgType & 1U); int32_t code = -1; @@ -289,18 +312,15 @@ static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) { if (pMsg->rpcMsg.handle == NULL) return; if (code == 0) { SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont}; - rpcSendResponse(&rsp); + mmPutRpcRspToWorker(pDnode, &rsp); } else { - if (terrno == TSDB_CODE_APP_NOT_READY) { - dndSendRedirectRsp(pDnode, pRpc); - } else if (terrno == TSDB_CODE_MND_ACTION_IN_PROGRESS) { - } else { + if (terrno != TSDB_CODE_MND_ACTION_IN_PROGRESS) { SRpcMsg rsp = {.handle = pRpc->handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont, .code = terrno}; - rpcSendResponse(&rsp); + mmPutRpcRspToWorker(pDnode, &rsp); } } } - taosFreeQitem(pMsg); rpcFreeCont(pRpc->pCont); + taosFreeQitem(pMsg); } diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 9eca5915db..2876cfba01 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -149,8 +149,8 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea pthread_mutex_unlock(&pQueue->mutex); tsem_post(&pQueue->sem); - (*pQueue->freeHeadFp)(pHead); - (*pQueue->freeBodyFp)(pBody); + // (*pQueue->freeHeadFp)(pHead); + // (*pQueue->freeBodyFp)(pBody); return 0; }