From 2a5e231fd2229262b3f0b157aec34a47a038b1be Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 15 Mar 2022 14:34:00 +0800 Subject: [PATCH] shm --- source/dnode/mgmt/container/src/dndNode.c | 5 +++-- source/dnode/mgmt/container/src/dndTransport.c | 8 ++++---- source/dnode/mgmt/dnode/src/dmMsg.c | 2 +- source/dnode/mgmt/dnode/src/dmWorker.c | 3 ++- source/dnode/mgmt/main/inc/dndMain.h | 6 +++--- source/dnode/mgmt/mnode/src/mmWorker.c | 5 ++++- 6 files changed, 17 insertions(+), 12 deletions(-) diff --git a/source/dnode/mgmt/container/src/dndNode.c b/source/dnode/mgmt/container/src/dndNode.c index db8785662d..82130660ab 100644 --- a/source/dnode/mgmt/container/src/dndNode.c +++ b/source/dnode/mgmt/container/src/dndNode.c @@ -77,6 +77,7 @@ SDnode *dndCreate(SDndCfg *pCfg) { goto _OVER; } + memcpy(&pDnode->cfg, pCfg, sizeof(SDndCfg)); dndSetStatus(pDnode, DND_STAT_INIT); pDnode->rebootTime = taosGetTimestampMs(); pDnode->pLockFile = dndCheckRunning(pCfg->dataDir); @@ -100,7 +101,6 @@ SDnode *dndCreate(SDndCfg *pCfg) { qmGetMgmtFp(&pDnode->wrappers[QNODE]); smGetMgmtFp(&pDnode->wrappers[SNODE]); bmGetMgmtFp(&pDnode->wrappers[BNODE]); - memcpy(&pDnode->cfg, pCfg, sizeof(SDndCfg)); if (dndInitMsgHandle(pDnode) != 0) { goto _OVER; @@ -306,12 +306,13 @@ void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) { goto _OVER; } - dTrace("msg:%p, is created, user:%s", pMsg, pMsg->user); + dTrace("msg:%p, is created, app:%p user:%s", pMsg, pRpc->ahandle, pMsg->user); code = (*msgFp)(pWrapper, pMsg); _OVER: if (code != 0) { + dError("msg:%p, failed to process since %s", pMsg, terrstr()); bool isReq = (pRpc->msgType & 1U); if (isReq) { SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; diff --git a/source/dnode/mgmt/container/src/dndTransport.c b/source/dnode/mgmt/container/src/dndTransport.c index b6db12d62d..196bdb165b 100644 --- a/source/dnode/mgmt/container/src/dndTransport.c +++ b/source/dnode/mgmt/container/src/dndTransport.c @@ -37,8 +37,8 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)]; if (pHandle->msgFp != NULL) { - dTrace("rsp:%s will be processed by %s, code:0x%x app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name, - pRsp->code & 0XFFFF, pRsp->ahandle); + dTrace("rsp:%s will be processed by %s, app:%p code:0x%x:%s", TMSG_INFO(msgType), pHandle->pWrapper->name, + pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code)); dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet); } else { dError("rsp:%s not processed, app:%p", TMSG_INFO(msgType), pRsp->ahandle); @@ -51,7 +51,7 @@ int32_t dndInitClient(SDnode *pDnode) { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.label = "CLI"; + rpcInit.label = "DND"; rpcInit.numOfThreads = 1; rpcInit.cfp = dndProcessResponse; rpcInit.sessions = 1024; @@ -218,7 +218,7 @@ int32_t dndInitServer(SDnode *pDnode) { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = pDnode->cfg.serverPort; - rpcInit.label = "SRV"; + rpcInit.label = "DND"; rpcInit.numOfThreads = numOfThreads; rpcInit.cfp = dndProcessRequest; rpcInit.sessions = tsMaxShellConns; diff --git a/source/dnode/mgmt/dnode/src/dmMsg.c b/source/dnode/mgmt/dnode/src/dmMsg.c index be70a42101..5879e29b56 100644 --- a/source/dnode/mgmt/dnode/src/dmMsg.c +++ b/source/dnode/mgmt/dnode/src/dmMsg.c @@ -54,7 +54,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527}; pMgmt->statusSent = 1; - dTrace("send status req to mnode, ahandle:%p", rpcMsg.ahandle); + dTrace("send req:%s to mnode, app:%p", TMSG_INFO(rpcMsg.msgType), rpcMsg.ahandle); dndSendReqToMnode(pMgmt->pDnode, &rpcMsg); } diff --git a/source/dnode/mgmt/dnode/src/dmWorker.c b/source/dnode/mgmt/dnode/src/dmWorker.c index 7747efe604..80fa34839e 100644 --- a/source/dnode/mgmt/dnode/src/dmWorker.c +++ b/source/dnode/mgmt/dnode/src/dmWorker.c @@ -130,6 +130,7 @@ static void dmProcessMgmtQueue(SDnode *pDnode, SNodeMsg *pNodeMsg) { rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; taosFreeQitem(pNodeMsg); + dTrace("msg:%p, is freed", pNodeMsg); } int32_t dmStartWorker(SDnodeMgmt *pMgmt) { @@ -174,5 +175,5 @@ int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { } dTrace("msg:%p, will be written to worker %s", pMsg, pWorker->name); - return dndWriteMsgToWorker(pWorker, pMsg, sizeof(SNodeMsg)); + return dndWriteMsgToWorker(pWorker, pMsg, 0); } \ No newline at end of file diff --git a/source/dnode/mgmt/main/inc/dndMain.h b/source/dnode/mgmt/main/inc/dndMain.h index eb4549be69..72aedbe338 100644 --- a/source/dnode/mgmt/main/inc/dndMain.h +++ b/source/dnode/mgmt/main/inc/dndMain.h @@ -14,8 +14,8 @@ * along with this program. If not, see . */ -#ifndef _TD_DND_EXEC_H_ -#define _TD_DND_EXEC_H_ +#ifndef _TD_DND_MAIN_H_ +#define _TD_DND_MAIN_H_ #include "dnode.h" @@ -45,4 +45,4 @@ SDndCfg dndGetCfg(); } #endif -#endif /*_TD_DND_EXEC_H_*/ +#endif /*_TD_DND_MAIN_H_*/ diff --git a/source/dnode/mgmt/mnode/src/mmWorker.c b/source/dnode/mgmt/mnode/src/mmWorker.c index 8b8b472f1b..3ca112782f 100644 --- a/source/dnode/mgmt/mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mnode/src/mmWorker.c @@ -270,4 +270,7 @@ static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg) { int32_t mmProcessWriteMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;} int32_t mmProcessSyncMsg( SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;} -int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {return 0;} \ No newline at end of file +int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + return -1; +} \ No newline at end of file