From 54fc2bfe9e5639640420a4f38b826936af6eba1f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 23 Sep 2024 18:51:18 +0800 Subject: [PATCH] fix mem leak --- source/libs/transport/inc/transComm.h | 5 --- source/libs/transport/src/transCli.c | 47 ++++++++++++++++++--------- source/libs/transport/src/transComm.c | 33 ------------------- 3 files changed, 31 insertions(+), 54 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index e75cd51fc0..0e809c06b0 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -350,11 +350,6 @@ typedef struct SWriteReq { void* conn; } SWriteReq; -void transReqQueueInit(queue* q); -void* transReqQueuePush(queue* q, SWriteReq* req); -void* transReqQueueRemove(void* arg); -void transReqQueueClear(queue* q); - // queue sending msgs typedef struct { queue node; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b4a998c88d..f02d5206db 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -110,15 +110,15 @@ typedef struct SCliConn { queue wq; // uv_write_t queue } SCliConn; -// #define TRANS_CONN_REF_INC(tconn) ((tconn) ? (tconn)->ref++ : 0) -// #define TRANS_CONN_REF_DEC(tconn) ((tconn) ? (tconn)->ref-- : 0) -// #define TRANS_CONN_REF_GET(tconn) ((tconn) ? (tconn)->ref : 0) - typedef struct { SCliConn* conn; void* arg; } SReqState; +typedef struct { + int64_t seq; + int32_t msgType; +} SFiterArg; typedef struct SCliReq { SReqCtx* ctx; queue q; @@ -282,7 +282,13 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); static void cliWalkCb(uv_handle_t* handle, void* arg); static FORCE_INLINE int32_t destroyAllReqs(SCliConn* SCliConn); -static FORCE_INLINE bool filterAllReq(void* e, void* arg); + +static FORCE_INLINE bool filterAllReq(void* e, void* arg); +static FORCE_INLINE bool filerBySeq(void* key, void* arg); +static FORCE_INLINE bool filterByQid(void* key, void* arg); +static FORCE_INLINE bool filterToDebug_timeoutMsg(void* key, void* arg); +static FORCE_INLINE bool filterToRmTimoutReq(void* key, void* arg); + typedef struct { void* p; HeapNode node; @@ -387,8 +393,6 @@ void cliConnMayUpdateTimer(SCliConn* conn, int timeout) { (void)uv_timer_start(conn->timer, cliConnTimeout__checkReq, timeout, 0); } -void cliHandleBatchResp(SCliConn* conn) { return; } - void destroyCliConnQTable(SCliConn* conn) { void* pIter = taosHashIterate(conn->pQTable, NULL); while (pIter != NULL) { @@ -404,12 +408,7 @@ void destroyCliConnQTable(SCliConn* conn) { conn->pQTable = NULL; } -typedef struct { - int64_t seq; - int32_t msgType; -} SFiterArg; - -bool filteBySeq(void* key, void* arg) { +static bool filteBySeq(void* key, void* arg) { SFiterArg* targ = arg; SCliReq* pReq = QUEUE_DATA(key, SCliReq, q); if (pReq->seq == targ->seq && pReq->msg.msgType + 1 == targ->msgType) { @@ -632,6 +631,7 @@ void cliConnTimeout(uv_timer_t* handle) { cliResetConnTimer(conn); return; } + tTrace("%s conn %p conn timeout", CONN_GET_INST_LABEL(conn), conn); TAOS_UNUSED(transUnrefCliHandle(conn)); } @@ -642,8 +642,22 @@ bool filterToRmTimoutReq(void* key, void* arg) { int64_t elapse = ((taosGetTimestampUs() - pReq->st) / 1000); if (elapse > READ_TIMEOUT) { return true; + } else { + return false; } - return true; + } + return false; +} + +bool filterToDebug_timeoutMsg(void* key, void* arg) { + SCliReq* pReq = QUEUE_DATA(key, SCliReq, q); + if (pReq->msg.info.qId == 0 && !REQUEST_NO_RESP(&pReq->msg) && pReq->ctx) { + int64_t elapse = ((taosGetTimestampUs() - pReq->st) / 1000); + if (elapse > READ_TIMEOUT) { + tWarn("req %s timeout, elapse:%" PRId64 "ms", TMSG_INFO(pReq->msg.msgType), elapse); + return false; + } + return false; } return false; } @@ -655,6 +669,8 @@ void cliConnCheckTimoutMsg(SCliConn* conn) { SCliThrd* pThrd = conn->hostThrd; STrans* pInst = pThrd->pInst; + transQueueRemoveByFilter(&conn->reqsSentOut, filterToDebug_timeoutMsg, NULL, &set, -1); + if (pInst->startReadTimer == 0) { return; } @@ -663,6 +679,7 @@ void cliConnCheckTimoutMsg(SCliConn* conn) { return; } + QUEUE_INIT(&set); transQueueRemoveByFilter(&conn->reqsSentOut, filterToRmTimoutReq, NULL, &set, -1); while (!QUEUE_IS_EMPTY(&set)) { @@ -1177,12 +1194,10 @@ static void cliBatchSendCb(uv_write_t* req, int status) { } cliConnMayUpdateTimer(conn, READ_TIMEOUT); - // if (!uv_is_readable(conn->stream)) { if (conn->readerStart == 0) { (void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); conn->readerStart = 1; } - //} if (!cliMayRecycleConn(conn)) { (void)cliBatchSend(conn); diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index cdd8eea12c..a7ff04cc58 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -413,39 +413,6 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) { return ret; } -void transReqQueueInit(queue* q) { - // init req queue - QUEUE_INIT(q); -} -void* transReqQueuePush(queue* q, SWriteReq* userReq) { - return NULL; - // uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t)); - // req->data = userReq; - - // QUEUE_PUSH(q, &userReq->q); - // return req; -} -void* transReqQueueRemove(void* arg) { - return NULL; - // void* ret = NULL; - // uv_write_t* req = arg; - - // SWriteReq* userReq = req ? req->data : NULL; - // if (req == NULL) return NULL; - // QUEUE_REMOVE(&userReq->q); - - // return userReq; -} -void transReqQueueClear(queue* q) { - return; - // while (!QUEUE_IS_EMPTY(q)) { - // queue* h = QUEUE_HEAD(q); - // QUEUE_REMOVE(h); - // SWriteReq* req = QUEUE_DATA(h, SWriteReq, q); - // taosMemoryFree(req); - // } -} - int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(void* arg)) { QUEUE_INIT(&wq->node); wq->freeFunc = (void (*)(void*))freeFunc;