From eb3a9a53abd00bd856c4fa00c2956f093d6acc6b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 5 Jul 2022 20:03:27 +0800 Subject: [PATCH] fix rpc mem leak --- include/common/tmsg.h | 8 +++-- include/common/tmsgdef.h | 7 ++++ source/dnode/mgmt/node_mgmt/src/dmMgmt.c | 1 + source/libs/transport/inc/transComm.h | 16 ++++----- source/libs/transport/src/transSvr.c | 44 +++++++++++++++--------- 5 files changed, 49 insertions(+), 27 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 110d9af45c..dedc06a2b9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -55,8 +55,12 @@ extern int32_t tMsgDict[]; #define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8) #define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) -#define TMSG_INFO(TYPE) \ - (((TYPE) >= 0 && (TYPE) < TDMT_MAX) ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] : 0) +#define TMSG_INFO(TYPE) \ + ((TYPE) >= 0 && \ + ((TYPE) < TDMT_DND_MAX_MSG | (TYPE) < TDMT_MND_MAX_MSG | (TYPE) < TDMT_VND_MAX_MSG | (TYPE) < TDMT_SCH_MAX_MSG | \ + (TYPE) < TDMT_STREAM_MAX_MSG | (TYPE) < TDMT_MON_MAX_MSG | (TYPE) < TDMT_SYNC_MAX_MSG)) \ + ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \ + : 0 #define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) typedef uint16_t tmsg_t; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 371e079913..2130a9c264 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -82,6 +82,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_DND_NET_TEST, "net-test", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_SYSTABLE_RETRIEVE, "dnode-retrieve", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_MND_MSG) TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL) @@ -164,6 +165,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SHOW_VARIABLES, "show-variables", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) @@ -198,6 +200,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_SCH_MSG) TD_DEF_MSG_TYPE(TDMT_SCH_QUERY, "query", NULL, NULL) @@ -209,6 +212,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "drop-task", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SCH_MAX_MSG, "sch-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_STREAM_MSG) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) @@ -217,6 +221,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_MON_MSG) TD_DEF_MSG_TYPE(TDMT_MON_MM_INFO, "monitor-minfo", NULL, NULL) @@ -227,6 +232,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MON_VM_LOAD, "monitor-vload", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_MM_LOAD, "monitor-mload", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_QM_LOAD, "monitor-qload", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_SYNC_MSG) TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL) @@ -251,6 +257,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_SET_MNODE_STANDBY, "set-mnode-standby", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) #if defined(TD_MSG_NUMBER_) TDMT_MAX diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index ee27f27f06..d70ed09920 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -212,6 +212,7 @@ void dmCleanupDnode(SDnode *pDnode) { dmCleanupClient(pDnode); dmCleanupServer(pDnode); dmClearVars(pDnode); + rpcCleanup(); dDebug("dnode is closed, ptr:%p", pDnode); } diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 5ce073081d..f699df6883 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -96,8 +96,8 @@ typedef void* queue[2]; #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit -#define TRANS_RETRY_INTERVAL 15 // ms retry interval -#define TRANS_CONN_TIMEOUT 3 // connect timeout +#define TRANS_RETRY_INTERVAL 15 // ms retry interval +#define TRANS_CONN_TIMEOUT 3 // connect timeout typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; @@ -180,18 +180,18 @@ typedef enum { Normal, Quit, Release, Register, Update } STransMsgType; typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus; #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) -#define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) +#define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) #define rpcIsReq(type) (type & 1U) #define TRANS_RESERVE_SIZE (sizeof(STranConnCtx)) -#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead)) -#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead))) -#define transContFromHead(msg) (msg + sizeof(STransMsgHead)) +#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead)) +#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead))) +#define transContFromHead(msg) (msg + sizeof(STransMsgHead)) #define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead)) -#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead)); -#define transIsReq(type) (type & 1U) +#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead)); +#define transIsReq(type) (type & 1U) #define transLabel(trans) ((STrans*)trans)->label diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 4c2af32be3..30dd7bbf33 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -241,18 +241,19 @@ static void uvHandleReq(SSvrConn* pConn) { tDebug("conn %p acquired by server app", pConn); } } + STrans* pTransInst = pConn->pTransInst; STraceId* trace = &pHead->traceId; if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); - tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", transLabel(pConn), pConn, + tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen); } else { - tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d", transLabel(pConn), - pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), - taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp, - transMsg.code); + tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d", + transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), + ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), + transMsg.contLen, pHead->noResp, transMsg.code); // no ref here } @@ -265,8 +266,8 @@ static void uvHandleReq(SSvrConn* pConn) { transMsg.info.refId = pConn->refId; transMsg.info.traceId = pHead->traceId; - tGTrace("%s handle %p conn: %p translated to app, refId: %" PRIu64 "", transLabel(pConn), transMsg.info.handle, pConn, - pConn->refId); + tGTrace("%s handle %p conn: %p translated to app, refId: %" PRIu64 "", transLabel(pTransInst), transMsg.info.handle, + pConn, pConn->refId); assert(transMsg.info.handle != NULL); if (pHead->noResp == 1) { @@ -281,7 +282,6 @@ static void uvHandleReq(SSvrConn* pConn) { transReleaseExHandle(transGetRefMgt(), pConn->refId); - STrans* pTransInst = pConn->pTransInst; (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); } @@ -290,14 +290,15 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { // opt SSvrConn* conn = cli->data; SConnBuffer* pBuf = &conn->readBuf; + STrans* pTransInst = conn->pTransInst; if (nread > 0) { pBuf->len += nread; - tTrace("%s conn %p total read: %d, current read: %d", transLabel(conn->pTransInst), conn, pBuf->len, (int)nread); + tTrace("%s conn %p total read: %d, current read: %d", transLabel(pTransInst), conn, pBuf->len, (int)nread); if (transReadComplete(pBuf)) { - tTrace("%s conn %p alread read complete packet", transLabel(conn->pTransInst), conn); + tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); uvHandleReq(conn); } else { - tTrace("%s conn %p read partial packet, continue to read", transLabel(conn->pTransInst), conn); + tTrace("%s conn %p read partial packet, continue to read", transLabel(pTransInst), conn); } return; } @@ -305,12 +306,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { return; } - tError("%s conn %p read error: %s", transLabel(conn->pTransInst), conn, uv_err_name(nread)); + tError("%s conn %p read error: %s", transLabel(pTransInst), conn, uv_err_name(nread)); if (nread < 0) { conn->broken = true; if (conn->status == ConnAcquire) { if (conn->regArg.init) { - tTrace("%s conn %p broken, notify server app", transLabel(conn->pTransInst), conn); + tTrace("%s conn %p broken, notify server app", transLabel(pTransInst), conn); STrans* pTransInst = conn->pTransInst; (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); memset(&conn->regArg, 0, sizeof(conn->regArg)); @@ -414,8 +415,9 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { char* msg = (char*)pHead; int32_t len = transMsgLenFromCont(pMsg->contLen); + STrans* pTransInst = pConn->pTransInst; STraceId* trace = &pMsg->info.traceId; - tGTrace("%s conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", transLabel(pConn->pTransInst), pConn, + tGTrace("%s conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", transLabel(pTransInst), pConn, TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), len); pHead->msgLen = htonl(len); @@ -761,9 +763,10 @@ static SSvrConn* createConn(void* hThrd) { exh->refId = transAddExHandle(transGetRefMgt(), exh); transAcquireExHandle(transGetRefMgt(), exh->refId); + STrans* pTransInst = pThrd->pTransInst; pConn->refId = exh->refId; transRefSrvHandle(pConn); - tTrace("%s handle %p, conn %p created, refId: %" PRId64 "", transLabel(pThrd->pTransInst), exh, pConn, pConn->refId); + tTrace("%s handle %p, conn %p created, refId: %" PRId64 "", transLabel(pTransInst), exh, pConn, pConn->refId); return pConn; } @@ -812,7 +815,13 @@ static void uvDestroyConn(uv_handle_t* handle) { transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); - tDebug("%s conn %p destroy", transLabel(thrd->pTransInst), conn); + STrans* pTransInst = thrd->pTransInst; + tDebug("%s conn %p destroy", transLabel(pTransInst), conn); + + for (int i = 0; i < transQueueSize(&conn->srvMsgs); i++) { + SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i); + destroySmsg(msg); + } transQueueDestroy(&conn->srvMsgs); QUEUE_REMOVE(&conn->queue); @@ -1103,7 +1112,8 @@ void transRegisterMsg(const STransMsg* msg) { m->msg = tmsg; m->type = Register; - tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle); + STrans* pTransInst = pThrd->pTransInst; + tTrace("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle); transAsyncSend(pThrd->asyncPool, &m->q); transReleaseExHandle(transGetRefMgt(), refId); return;