diff --git a/src/dnode/inc/dnodeMgmt.h b/src/dnode/inc/dnodeMgmt.h index 28844ba0e5..1a788610e5 100644 --- a/src/dnode/inc/dnodeMgmt.h +++ b/src/dnode/inc/dnodeMgmt.h @@ -32,7 +32,7 @@ void* dnodeGetVnodeWal(void *pVnode); void* dnodeGetVnodeTsdb(void *pVnode); void dnodeReleaseVnode(void *pVnode); -void dnodeSendRedirectMsg(int32_t msgType, void *thandle, bool forShell); +void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell); void dnodeGetMnodeIpSetForPeer(void *ipSet); void dnodeGetMnodeIpSetForShell(void *ipSe); diff --git a/src/dnode/src/dnodeMPeer.c b/src/dnode/src/dnodeMPeer.c index dec4f5ef59..46e7a0a7e2 100644 --- a/src/dnode/src/dnodeMPeer.c +++ b/src/dnode/src/dnodeMPeer.c @@ -111,7 +111,7 @@ void dnodeFreeMnodePqueue() { void dnodeDispatchToMnodePeerQueue(SRpcMsg *pMsg) { if (!mnodeIsRunning() || tsMPeerQueue == NULL) { - dnodeSendRedirectMsg(pMsg->msgType, pMsg->handle, false); + dnodeSendRedirectMsg(pMsg, false); return; } @@ -120,18 +120,23 @@ void dnodeDispatchToMnodePeerQueue(SRpcMsg *pMsg) { taosWriteQitem(tsMPeerQueue, TAOS_QTYPE_RPC, pPeer); } +static void dnodeFreeMnodePeadMsg(SMnodeMsg *pPeer) { + mnodeCleanupMsg(pPeer); + taosFreeQitem(pPeer); +} + static void dnodeSendRpcMnodePeerRsp(SMnodeMsg *pPeer, int32_t code) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; SRpcMsg rpcRsp = { - .handle = pPeer->thandle, + .handle = pPeer->rpcMsg.handle, .pCont = pPeer->rpcRsp.rsp, .contLen = pPeer->rpcRsp.len, .code = code, }; rpcSendResponse(&rpcRsp); - mnodeCleanupMsg(pPeer); + dnodeFreeMnodePeadMsg(pPeer); } static void *dnodeProcessMnodePeerQueue(void *param) { @@ -145,10 +150,9 @@ static void *dnodeProcessMnodePeerQueue(void *param) { break; } - dTrace("%p, msg:%s will be processed in mpeer queue", pPeerMsg->ahandle, taosMsg[pPeerMsg->msgType]); + dTrace("%p, msg:%s will be processed in mpeer queue", pPeerMsg->rpcMsg.ahandle, taosMsg[pPeerMsg->rpcMsg.msgType]); int32_t code = mnodeProcessPeerReq(pPeerMsg); dnodeSendRpcMnodePeerRsp(pPeerMsg, code); - taosFreeQitem(pPeerMsg); } return NULL; diff --git a/src/dnode/src/dnodeMRead.c b/src/dnode/src/dnodeMRead.c index 2ab5f48a9a..9a977ffe83 100644 --- a/src/dnode/src/dnodeMRead.c +++ b/src/dnode/src/dnodeMRead.c @@ -116,7 +116,7 @@ void dnodeFreeMnodeRqueue() { void dnodeDispatchToMnodeReadQueue(SRpcMsg *pMsg) { if (!mnodeIsRunning() || tsMReadQueue == NULL) { - dnodeSendRedirectMsg(pMsg->msgType, pMsg->handle, true); + dnodeSendRedirectMsg(pMsg, true); return; } @@ -125,18 +125,23 @@ void dnodeDispatchToMnodeReadQueue(SRpcMsg *pMsg) { taosWriteQitem(tsMReadQueue, TAOS_QTYPE_RPC, pRead); } +static void dnodeFreeMnodeReadMsg(SMnodeMsg *pRead) { + mnodeCleanupMsg(pRead); + taosFreeQitem(pRead); +} + static void dnodeSendRpcMnodeReadRsp(SMnodeMsg *pRead, int32_t code) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; SRpcMsg rpcRsp = { - .handle = pRead->thandle, + .handle = pRead->rpcMsg.handle, .pCont = pRead->rpcRsp.rsp, .contLen = pRead->rpcRsp.len, .code = code, }; rpcSendResponse(&rpcRsp); - mnodeCleanupMsg(pRead); + dnodeFreeMnodeReadMsg(pRead); } static void *dnodeProcessMnodeReadQueue(void *param) { @@ -150,10 +155,9 @@ static void *dnodeProcessMnodeReadQueue(void *param) { break; } - dTrace("%p, msg:%s will be processed in mread queue", pReadMsg->ahandle, taosMsg[pReadMsg->msgType]); + dTrace("%p, msg:%s will be processed in mread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]); int32_t code = mnodeProcessRead(pReadMsg); - dnodeSendRpcMnodeReadRsp(pReadMsg, code); - taosFreeQitem(pReadMsg); + dnodeSendRpcMnodeReadRsp(pReadMsg, code); } return NULL; diff --git a/src/dnode/src/dnodeMWrite.c b/src/dnode/src/dnodeMWrite.c index 89c44d829b..b54d295d05 100644 --- a/src/dnode/src/dnodeMWrite.c +++ b/src/dnode/src/dnodeMWrite.c @@ -113,7 +113,7 @@ void dnodeFreeMnodeWqueue() { void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) { if (!mnodeIsRunning() || tsMWriteQueue == NULL) { - dnodeSendRedirectMsg(pMsg->msgType, pMsg->handle, true); + dnodeSendRedirectMsg(pMsg, true); return; } @@ -122,19 +122,24 @@ void dnodeDispatchToMnodeWriteQueue(SRpcMsg *pMsg) { taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); } +static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) { + mnodeCleanupMsg(pWrite); + taosFreeQitem(pWrite); +} + void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) { SMnodeMsg *pWrite = pRaw; if (code == TSDB_CODE_ACTION_IN_PROGRESS) return; SRpcMsg rpcRsp = { - .handle = pWrite->thandle, + .handle = pWrite->rpcMsg.handle, .pCont = pWrite->rpcRsp.rsp, .contLen = pWrite->rpcRsp.len, .code = code, }; rpcSendResponse(&rpcRsp); - mnodeCleanupMsg(pWrite); + dnodeFreeMnodeWriteMsg(pWrite); } static void *dnodeProcessMnodeWriteQueue(void *param) { @@ -148,26 +153,19 @@ static void *dnodeProcessMnodeWriteQueue(void *param) { break; } - dTrace("%p, msg:%s will be processed in mwrite queue", pWriteMsg->ahandle, taosMsg[pWriteMsg->msgType]); + dTrace("%p, msg:%s will be processed in mwrite queue", pWriteMsg->rpcMsg.ahandle, taosMsg[pWriteMsg->rpcMsg.msgType]); int32_t code = mnodeProcessWrite(pWriteMsg); - dnodeSendRpcMnodeWriteRsp(pWriteMsg, code); - taosFreeQitem(pWriteMsg); + dnodeSendRpcMnodeWriteRsp(pWriteMsg, code); } return NULL; } -static void dnodeFreeMnodeWriteMsg(void *pMsg) { - SMnodeMsg *pWrite = pMsg; - mnodeCleanupMsg(pWrite); - taosFreeQitem(pWrite); -} - void dnodeReprocessMnodeWriteMsg(void *pMsg) { SMnodeMsg *pWrite = pMsg; if (!mnodeIsRunning() || tsMWriteQueue == NULL) { - dnodeSendRedirectMsg(pWrite->msgType, pWrite->thandle, true); + dnodeSendRedirectMsg(pMsg, true); dnodeFreeMnodeWriteMsg(pWrite); } else { taosWriteQitem(tsMWriteQueue, TAOS_QTYPE_RPC, pWrite); diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index ec7ff4c66c..31e73eafbc 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -609,9 +609,9 @@ int32_t dnodeGetDnodeId() { return tsDnodeCfg.dnodeId; } -void dnodeSendRedirectMsg(int32_t msgType, void *thandle, bool forShell) { +void dnodeSendRedirectMsg(SRpcMsg *rpcMsg, bool forShell) { SRpcConnInfo connInfo; - rpcGetConnInfo(thandle, &connInfo); + rpcGetConnInfo(rpcMsg->handle, &connInfo); SRpcIpSet ipSet = {0}; if (forShell) { @@ -620,7 +620,7 @@ void dnodeSendRedirectMsg(int32_t msgType, void *thandle, bool forShell) { dnodeGetMnodeIpSetForPeer(&ipSet); } - dTrace("msg:%s will be redirected, dnodeIp:%s user:%s, numOfIps:%d inUse:%d", taosMsg[msgType], + dTrace("msg:%s will be redirected, dnodeIp:%s user:%s, numOfIps:%d inUse:%d", taosMsg[rpcMsg->msgType], taosIpStr(connInfo.clientIp), connInfo.user, ipSet.numOfIps, ipSet.inUse); for (int i = 0; i < ipSet.numOfIps; ++i) { @@ -628,5 +628,5 @@ void dnodeSendRedirectMsg(int32_t msgType, void *thandle, bool forShell) { ipSet.port[i] = htons(ipSet.port[i]); } - rpcSendRedirectRsp(thandle, &ipSet); + rpcSendRedirectRsp(rpcMsg->handle, &ipSet); }