From 7341f68cbe8330f472ad2ec00714a2f2f6a1fdb8 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 19 May 2022 16:56:50 +0800 Subject: [PATCH 1/2] refactor: adjust msgcb --- source/dnode/mgmt/mgmt_bnode/inc/bmInt.h | 6 ++--- source/dnode/mgmt/mgmt_bnode/src/bmHandle.c | 14 ++++------- source/dnode/mgmt/mgmt_mnode/inc/mmInt.h | 16 ++++++------- source/dnode/mgmt/mgmt_mnode/src/mmFile.c | 8 +++---- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 24 +++++++------------ source/dnode/mgmt/mgmt_mnode/src/mmInt.c | 8 +++---- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 2 +- source/dnode/mgmt/mgmt_qnode/inc/qmInt.h | 2 +- source/dnode/mgmt/mgmt_qnode/src/qmHandle.c | 14 ++++------- source/dnode/mgmt/mgmt_snode/inc/smInt.h | 2 +- source/dnode/mgmt/mgmt_snode/src/smHandle.c | 14 ++++------- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 8 +++---- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 18 +++++++------- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 18 +++++++------- 14 files changed, 68 insertions(+), 86 deletions(-) diff --git a/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h b/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h index 5ee4d48f9a..c05ad46189 100644 --- a/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h +++ b/source/dnode/mgmt/mgmt_bnode/inc/bmInt.h @@ -36,9 +36,9 @@ typedef struct SBnodeMgmt { // bmHandle.c SArray *bmGetMsgHandles(); -int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq); -int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pReq); -int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq); +int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); +int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); +int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg); // bmWorker.c int32_t bmStartWorker(SBnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c b/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c index d2e547078e..9ec445c69c 100644 --- a/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c +++ b/source/dnode/mgmt/mgmt_bnode/src/bmHandle.c @@ -18,7 +18,7 @@ void bmGetMonitorInfo(SBnodeMgmt *pMgmt, SMonBmInfo *bmInfo) {} -int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) { +int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonBmInfo bmInfo = {0}; bmGetMonitorInfo(pMgmt, &bmInfo); dmGetMonitorSystemInfo(&bmInfo.sys); @@ -37,17 +37,15 @@ int32_t bmProcessGetMonBmInfoReq(SBnodeMgmt *pMgmt, SRpcMsg *pReq) { } tSerializeSMonBmInfo(pRsp, rspLen, &bmInfo); - pReq->info.rsp = pRsp; - pReq->info.rspLen = rspLen; + pMsg->info.rsp = pRsp; + pMsg->info.rspLen = rspLen; tFreeSMonBmInfo(&bmInfo); return 0; } int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; - SDCreateBnodeReq createReq = {0}; - if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { + if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -68,10 +66,8 @@ int32_t bmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { } int32_t bmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; - SDDropBnodeReq dropReq = {0}; - if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } diff --git a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h index 648dc217dc..75e83d6547 100644 --- a/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h +++ b/source/dnode/mgmt/mgmt_mnode/inc/mmInt.h @@ -41,18 +41,18 @@ typedef struct SMnodeMgmt { // mmFile.c int32_t mmReadFile(SMnodeMgmt *pMgmt, bool *pDeployed); -int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed); +int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed); // mmInt.c -int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq); +int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg); // mmHandle.c SArray *mmGetMsgHandles(); int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq); -int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq); +int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); // mmWorker.c int32_t mmStartWorker(SMnodeMgmt *pMgmt); @@ -62,10 +62,10 @@ int32_t mmPutNodeMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t mmPutNodeMsgToMonitorQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); -int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); -int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); -int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); -int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pRpc); +int32_t mmPutRpcMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutRpcMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c index 0bab05f973..2aa1087770 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmFile.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmFile.c @@ -104,7 +104,7 @@ _OVER: return code; } -int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) { +int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pMsg, bool deployed) { char file[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0}; snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP); @@ -124,11 +124,11 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, SDCreateMnodeReq *pReq, bool deployed) { len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, " \"mnodes\": [{\n"); - int8_t replica = (pReq != NULL ? pReq->replica : pMgmt->replica); + int8_t replica = (pMsg != NULL ? pMsg->replica : pMgmt->replica); for (int32_t i = 0; i < replica; ++i) { SReplica *pReplica = &pMgmt->replicas[i]; - if (pReq != NULL) { - pReplica = &pReq->replicas[i]; + if (pMsg != NULL) { + pReplica = &pMsg->replicas[i]; } len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pReplica->id); len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pReplica->fqdn); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 8b2d8f8bad..2ce42d7a5f 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -25,7 +25,7 @@ void mmGetMnodeLoads(SMnodeMgmt *pMgmt, SMonMloadInfo *pInfo) { mndGetLoad(pMgmt->pMnode, &pInfo->load); } -int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { +int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonMmInfo mmInfo = {0}; mmGetMonitorInfo(pMgmt, &mmInfo); dmGetMonitorSystemInfo(&mmInfo.sys); @@ -44,13 +44,13 @@ int32_t mmProcessGetMonitorInfoReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { } tSerializeSMonMmInfo(pRsp, rspLen, &mmInfo); - pReq->info.rsp = pRsp; - pReq->info.rspLen = rspLen; + pMsg->info.rsp = pRsp; + pMsg->info.rspLen = rspLen; tFreeSMonMmInfo(&mmInfo); return 0; } -int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { +int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonMloadInfo mloads = {0}; mmGetMnodeLoads(pMgmt, &mloads); @@ -67,16 +67,14 @@ int32_t mmProcessGetLoadsReq(SMnodeMgmt *pMgmt, SRpcMsg *pReq) { } tSerializeSMonMloadInfo(pRsp, rspLen, &mloads); - pReq->info.rsp = pRsp; - pReq->info.rspLen = rspLen; + pMsg->info.rsp = pRsp; + pMsg->info.rspLen = rspLen; return 0; } int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; - SDCreateMnodeReq createReq = {0}; - if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { + if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -101,10 +99,8 @@ int32_t mmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { } int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; - SDDropMnodeReq dropReq = {0}; - if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -129,10 +125,8 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { } int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; - SDAlterMnodeReq alterReq = {0}; - if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) { + if (tDeserializeSDCreateMnodeReq(pMsg->pCont, pMsg->contLen, &alterReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c index a24334550b..4f7fd4a1c0 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmInt.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmInt.c @@ -87,9 +87,9 @@ static int32_t mmBuildOptionFromReq(SMnodeMgmt *pMgmt, SMnodeOpt *pOption, SDCre return 0; } -int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) { +int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pMsg) { SMnodeOpt option = {0}; - if (mmBuildOptionFromReq(pMgmt, &option, pReq) != 0) { + if (mmBuildOptionFromReq(pMgmt, &option, pMsg) != 0) { return -1; } @@ -98,7 +98,7 @@ int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) { } bool deployed = true; - if (mmWriteFile(pMgmt, pReq, deployed) != 0) { + if (mmWriteFile(pMgmt, pMsg, deployed) != 0) { dError("failed to write mnode file since %s", terrstr()); return -1; } @@ -135,7 +135,7 @@ static int32_t mmOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)mmPutRpcMsgToQueryQueue; pMgmt->msgCb.queueFps[READ_QUEUE] = (PutToQueueFp)mmPutRpcMsgToReadQueue; pMgmt->msgCb.queueFps[WRITE_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; - pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToWriteQueue; + pMgmt->msgCb.queueFps[SYNC_QUEUE] = (PutToQueueFp)mmPutRpcMsgToSyncQueue; pMgmt->msgCb.mgmt = pMgmt; bool deployed = false; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index 9b9960cf3a..cf76d3a167 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -126,7 +126,7 @@ int32_t mmPutRpcMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->readWorker, pMsg); } -int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); } +int32_t mmPutRpcMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) { return mmPutRpcMsgToWorker(&pMgmt->syncWorker, pMsg); } int32_t mmStartWorker(SMnodeMgmt *pMgmt) { SSingleWorkerCfg qCfg = { diff --git a/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h b/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h index dd16eee643..9738fb0c45 100644 --- a/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h +++ b/source/dnode/mgmt/mgmt_qnode/inc/qmInt.h @@ -39,7 +39,7 @@ typedef struct SQnodeMgmt { SArray *qmGetMsgHandles(); int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); -int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq); +int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); // qmWorker.c int32_t qmPutRpcMsgToQueryQueue(SQnodeMgmt *pMgmt, SRpcMsg *pMsg); diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c index ac0868d8ef..c4b1ab63e4 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmHandle.c @@ -18,7 +18,7 @@ void qmGetMonitorInfo(SQnodeMgmt *pMgmt, SMonQmInfo *qmInfo) {} -int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) { +int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonQmInfo qmInfo = {0}; qmGetMonitorInfo(pMgmt, &qmInfo); dmGetMonitorSystemInfo(&qmInfo.sys); @@ -37,17 +37,15 @@ int32_t qmProcessGetMonitorInfoReq(SQnodeMgmt *pMgmt, SRpcMsg *pReq) { } tSerializeSMonQmInfo(pRsp, rspLen, &qmInfo); - pReq->info.rsp = pRsp; - pReq->info.rspLen = rspLen; + pMsg->info.rsp = pRsp; + pMsg->info.rspLen = rspLen; tFreeSMonQmInfo(&qmInfo); return 0; } int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; - SDCreateQnodeReq createReq = {0}; - if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { + if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -68,10 +66,8 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { } int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; - SDDropQnodeReq dropReq = {0}; - if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } diff --git a/source/dnode/mgmt/mgmt_snode/inc/smInt.h b/source/dnode/mgmt/mgmt_snode/inc/smInt.h index 68b6ef659e..fbf63dda43 100644 --- a/source/dnode/mgmt/mgmt_snode/inc/smInt.h +++ b/source/dnode/mgmt/mgmt_snode/inc/smInt.h @@ -40,7 +40,7 @@ typedef struct SSnodeMgmt { SArray *smGetMsgHandles(); int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg); -int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq); +int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg); // smWorker.c int32_t smStartWorker(SSnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index d6ff73b132..bf1bb145b7 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -18,7 +18,7 @@ void smGetMonitorInfo(SSnodeMgmt *pMgmt, SMonSmInfo *smInfo) {} -int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) { +int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonSmInfo smInfo = {0}; smGetMonitorInfo(pMgmt, &smInfo); dmGetMonitorSystemInfo(&smInfo.sys); @@ -37,17 +37,15 @@ int32_t smProcessGetMonitorInfoReq(SSnodeMgmt *pMgmt, SRpcMsg *pReq) { } tSerializeSMonSmInfo(pRsp, rspLen, &smInfo); - pReq->info.rsp = pRsp; - pReq->info.rspLen = rspLen; + pMsg->info.rsp = pRsp; + pMsg->info.rspLen = rspLen; tFreeSMonSmInfo(&smInfo); return 0; } int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; - SDCreateSnodeReq createReq = {0}; - if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { + if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -68,10 +66,8 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { } int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; - SDDropSnodeReq dropReq = {0}; - if (tDeserializeSCreateDropMQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + if (tDeserializeSCreateDropMQSBNodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 022d6204ef..5ec33fe810 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -84,10 +84,10 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode); // vmHandle.c SArray *vmGetMsgHandles(); -int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); -int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); -int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); -int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq); +int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); // vmFile.c int32_t vmGetVnodeListFromFile(SVnodeMgmt *pMgmt, SWrapperCfg **ppCfgs, int32_t *numOfVnodes); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 602922feeb..3da3d90ae1 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -82,7 +82,7 @@ void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { taosArrayDestroy(pVloads); } -int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) { +int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonVmInfo vmInfo = {0}; vmGetMonitorInfo(pMgmt, &vmInfo); dmGetMonitorSystemInfo(&vmInfo.sys); @@ -101,13 +101,13 @@ int32_t vmProcessGetMonitorInfoReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) { } tSerializeSMonVmInfo(pRsp, rspLen, &vmInfo); - pReq->info.rsp = pRsp; - pReq->info.rspLen = rspLen; + pMsg->info.rsp = pRsp; + pMsg->info.rspLen = rspLen; tFreeSMonVmInfo(&vmInfo); return 0; } -int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) { +int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { SMonVloadInfo vloads = {0}; vmGetVnodeLoads(pMgmt, &vloads); @@ -124,8 +124,8 @@ int32_t vmProcessGetLoadsReq(SVnodeMgmt *pMgmt, SRpcMsg *pReq) { } tSerializeSMonVloadInfo(pRsp, rspLen, &vloads); - pReq->info.rsp = pRsp; - pReq->info.rspLen = rspLen; + pMsg->info.rsp = pRsp; + pMsg->info.rspLen = rspLen; tFreeSMonVloadInfo(&vloads); return 0; } @@ -174,12 +174,11 @@ static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SW } int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; SCreateVnodeReq createReq = {0}; int32_t code = -1; char path[TSDB_FILENAME_LEN] = {0}; - if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { + if (tDeserializeSCreateVnodeReq(pMsg->pCont, pMsg->contLen, &createReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } @@ -242,9 +241,8 @@ _OVER: } int32_t vmProcessDropVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { - SRpcMsg *pReq = pMsg; SDropVnodeReq dropReq = {0}; - if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + if (tDeserializeSDropVnodeReq(pMsg->pCont, pMsg->contLen, &dropReq) != 0) { terrno = TSDB_CODE_INVALID_MSG; return -1; } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index b4c8a0807c..ee9008b198 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -200,12 +200,12 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { return 0; } -static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) { +static void dmSendRpcRedirectRsp(const SRpcMsg *pMsg) { SDnode *pDnode = dmInstance(); SEpSet epSet = {0}; dmGetMnodeEpSet(&pDnode->data, &epSet); - dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->info.handle, epSet.numOfEps, epSet.inUse); + dDebug("RPC %p, req is redirected, num:%d use:%d", pMsg->info.handle, epSet.numOfEps, epSet.inUse); for (int32_t i = 0; i < epSet.numOfEps; ++i) { dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); if (strcmp(epSet.eps[i].fqdn, tsLocalFqdn) == 0 && epSet.eps[i].port == tsServerPort) { @@ -220,12 +220,14 @@ static void dmSendRpcRedirectRsp(const SRpcMsg *pReq) { SRpcMsg rsp = { .code = TSDB_CODE_RPC_REDIRECT, - .info = pReq->info, + .info = pMsg->info, .contLen = len, }; rsp.pCont = rpcMallocCont(len); tSerializeSMEpSet(rsp.pCont, len, &msg); rpcSendResponse(&rsp); + + rpcFreeCont(pMsg->pCont); } static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) { @@ -239,16 +241,16 @@ static inline void dmSendRecv(SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) { } } -static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pReq) { +static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) { SDnode *pDnode = dmInstance(); if (pDnode->status != DND_STAT_RUNNING) { - rpcFreeCont(pReq->pCont); - pReq->pCont = NULL; + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; terrno = TSDB_CODE_NODE_OFFLINE; - dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->info.handle); + dError("failed to send rpc msg since %s, handle:%p", terrstr(), pMsg->info.handle); return -1; } else { - rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL); + rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pMsg, NULL); return 0; } } From 366d0f6d08427607341011ebe7591322d775005c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 19 May 2022 17:39:24 +0800 Subject: [PATCH 2/2] refactor: adjust msgcb --- source/dnode/mnode/impl/src/mndDnode.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 5fdd2f1842..2a4cf01115 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -624,14 +624,12 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) { SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .info = pReq->info}; - mInfo("dnode:%d, app:%p config:%s req send to dnode", cfgReq.dnodeId, rpcMsg.info.ahandle, cfgReq.config); - tmsgSendReq(&epSet, &rpcMsg); - - return 0; + mDebug("dnode:%d, send config req to dnode, app:%p", cfgReq.dnodeId, rpcMsg.info.ahandle); + return tmsgSendReq(&epSet, &rpcMsg); } static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) { - mInfo("app:%p config rsp from dnode", pRsp->info.ahandle); + mDebug("config rsp from dnode, app:%p", pRsp->info.ahandle); return TSDB_CODE_SUCCESS; }