fix mem leak

This commit is contained in:
yihaoDeng 2024-09-23 18:51:18 +08:00
parent e12741ed18
commit 54fc2bfe9e
3 changed files with 31 additions and 54 deletions

View File

@ -350,11 +350,6 @@ typedef struct SWriteReq {
void* conn;
} SWriteReq;
void transReqQueueInit(queue* q);
void* transReqQueuePush(queue* q, SWriteReq* req);
void* transReqQueueRemove(void* arg);
void transReqQueueClear(queue* q);
// queue sending msgs
typedef struct {
queue node;

View File

@ -110,15 +110,15 @@ typedef struct SCliConn {
queue wq; // uv_write_t queue
} SCliConn;
// #define TRANS_CONN_REF_INC(tconn) ((tconn) ? (tconn)->ref++ : 0)
// #define TRANS_CONN_REF_DEC(tconn) ((tconn) ? (tconn)->ref-- : 0)
// #define TRANS_CONN_REF_GET(tconn) ((tconn) ? (tconn)->ref : 0)
typedef struct {
SCliConn* conn;
void* arg;
} SReqState;
typedef struct {
int64_t seq;
int32_t msgType;
} SFiterArg;
typedef struct SCliReq {
SReqCtx* ctx;
queue q;
@ -282,7 +282,13 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
static void cliWalkCb(uv_handle_t* handle, void* arg);
static FORCE_INLINE int32_t destroyAllReqs(SCliConn* SCliConn);
static FORCE_INLINE bool filterAllReq(void* e, void* arg);
static FORCE_INLINE bool filerBySeq(void* key, void* arg);
static FORCE_INLINE bool filterByQid(void* key, void* arg);
static FORCE_INLINE bool filterToDebug_timeoutMsg(void* key, void* arg);
static FORCE_INLINE bool filterToRmTimoutReq(void* key, void* arg);
typedef struct {
void* p;
HeapNode node;
@ -387,8 +393,6 @@ void cliConnMayUpdateTimer(SCliConn* conn, int timeout) {
(void)uv_timer_start(conn->timer, cliConnTimeout__checkReq, timeout, 0);
}
void cliHandleBatchResp(SCliConn* conn) { return; }
void destroyCliConnQTable(SCliConn* conn) {
void* pIter = taosHashIterate(conn->pQTable, NULL);
while (pIter != NULL) {
@ -404,12 +408,7 @@ void destroyCliConnQTable(SCliConn* conn) {
conn->pQTable = NULL;
}
typedef struct {
int64_t seq;
int32_t msgType;
} SFiterArg;
bool filteBySeq(void* key, void* arg) {
static bool filteBySeq(void* key, void* arg) {
SFiterArg* targ = arg;
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
if (pReq->seq == targ->seq && pReq->msg.msgType + 1 == targ->msgType) {
@ -632,6 +631,7 @@ void cliConnTimeout(uv_timer_t* handle) {
cliResetConnTimer(conn);
return;
}
tTrace("%s conn %p conn timeout", CONN_GET_INST_LABEL(conn), conn);
TAOS_UNUSED(transUnrefCliHandle(conn));
}
@ -642,8 +642,22 @@ bool filterToRmTimoutReq(void* key, void* arg) {
int64_t elapse = ((taosGetTimestampUs() - pReq->st) / 1000);
if (elapse > READ_TIMEOUT) {
return true;
} else {
return false;
}
return true;
}
return false;
}
bool filterToDebug_timeoutMsg(void* key, void* arg) {
SCliReq* pReq = QUEUE_DATA(key, SCliReq, q);
if (pReq->msg.info.qId == 0 && !REQUEST_NO_RESP(&pReq->msg) && pReq->ctx) {
int64_t elapse = ((taosGetTimestampUs() - pReq->st) / 1000);
if (elapse > READ_TIMEOUT) {
tWarn("req %s timeout, elapse:%" PRId64 "ms", TMSG_INFO(pReq->msg.msgType), elapse);
return false;
}
return false;
}
return false;
}
@ -655,6 +669,8 @@ void cliConnCheckTimoutMsg(SCliConn* conn) {
SCliThrd* pThrd = conn->hostThrd;
STrans* pInst = pThrd->pInst;
transQueueRemoveByFilter(&conn->reqsSentOut, filterToDebug_timeoutMsg, NULL, &set, -1);
if (pInst->startReadTimer == 0) {
return;
}
@ -663,6 +679,7 @@ void cliConnCheckTimoutMsg(SCliConn* conn) {
return;
}
QUEUE_INIT(&set);
transQueueRemoveByFilter(&conn->reqsSentOut, filterToRmTimoutReq, NULL, &set, -1);
while (!QUEUE_IS_EMPTY(&set)) {
@ -1177,12 +1194,10 @@ static void cliBatchSendCb(uv_write_t* req, int status) {
}
cliConnMayUpdateTimer(conn, READ_TIMEOUT);
// if (!uv_is_readable(conn->stream)) {
if (conn->readerStart == 0) {
(void)uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb);
conn->readerStart = 1;
}
//}
if (!cliMayRecycleConn(conn)) {
(void)cliBatchSend(conn);

View File

@ -413,39 +413,6 @@ void* transCtxDumpBrokenlinkVal(STransCtx* ctx, int32_t* msgType) {
return ret;
}
void transReqQueueInit(queue* q) {
// init req queue
QUEUE_INIT(q);
}
void* transReqQueuePush(queue* q, SWriteReq* userReq) {
return NULL;
// uv_write_t* req = taosMemoryCalloc(1, sizeof(uv_write_t));
// req->data = userReq;
// QUEUE_PUSH(q, &userReq->q);
// return req;
}
void* transReqQueueRemove(void* arg) {
return NULL;
// void* ret = NULL;
// uv_write_t* req = arg;
// SWriteReq* userReq = req ? req->data : NULL;
// if (req == NULL) return NULL;
// QUEUE_REMOVE(&userReq->q);
// return userReq;
}
void transReqQueueClear(queue* q) {
return;
// while (!QUEUE_IS_EMPTY(q)) {
// queue* h = QUEUE_HEAD(q);
// QUEUE_REMOVE(h);
// SWriteReq* req = QUEUE_DATA(h, SWriteReq, q);
// taosMemoryFree(req);
// }
}
int32_t transQueueInit(STransQueue* wq, void (*freeFunc)(void* arg)) {
QUEUE_INIT(&wq->node);
wq->freeFunc = (void (*)(void*))freeFunc;