fix mem leak
This commit is contained in:
parent
293a96d959
commit
6838574c8f
|
@ -15,6 +15,10 @@
|
||||||
#ifdef USE_UV
|
#ifdef USE_UV
|
||||||
#include "transComm.h"
|
#include "transComm.h"
|
||||||
|
|
||||||
|
typedef struct SConnList {
|
||||||
|
queue conn;
|
||||||
|
} SConnList;
|
||||||
|
|
||||||
typedef struct SCliConn {
|
typedef struct SCliConn {
|
||||||
T_REF_DECLARE()
|
T_REF_DECLARE()
|
||||||
uv_connect_t connReq;
|
uv_connect_t connReq;
|
||||||
|
@ -25,7 +29,9 @@ typedef struct SCliConn {
|
||||||
|
|
||||||
SConnBuffer readBuf;
|
SConnBuffer readBuf;
|
||||||
STransQueue cliMsgs;
|
STransQueue cliMsgs;
|
||||||
|
|
||||||
queue q;
|
queue q;
|
||||||
|
SConnList* list;
|
||||||
|
|
||||||
STransCtx ctx;
|
STransCtx ctx;
|
||||||
bool broken; // link broken or not
|
bool broken; // link broken or not
|
||||||
|
@ -86,10 +92,6 @@ typedef struct SCliObj {
|
||||||
SCliThrd** pThreadObj;
|
SCliThrd** pThreadObj;
|
||||||
} SCliObj;
|
} SCliObj;
|
||||||
|
|
||||||
typedef struct SConnList {
|
|
||||||
queue conn;
|
|
||||||
} SConnList;
|
|
||||||
|
|
||||||
// conn pool
|
// conn pool
|
||||||
// add expire timeout and capacity limit
|
// add expire timeout and capacity limit
|
||||||
static void* createConnPool(int size);
|
static void* createConnPool(int size);
|
||||||
|
@ -101,7 +103,7 @@ static void doCloseIdleConn(void* param);
|
||||||
static int sockDebugInfo(struct sockaddr* sockname, char* dst) {
|
static int sockDebugInfo(struct sockaddr* sockname, char* dst) {
|
||||||
struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
|
struct sockaddr_in addr = *(struct sockaddr_in*)sockname;
|
||||||
|
|
||||||
char buf[20] = {0};
|
char buf[16] = {0};
|
||||||
int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
|
int r = uv_ip4_name(&addr, (char*)buf, sizeof(buf));
|
||||||
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
|
sprintf(dst, "%s:%d", buf, ntohs(addr.sin_port));
|
||||||
return r;
|
return r;
|
||||||
|
@ -120,6 +122,8 @@ static void cliAsyncCb(uv_async_t* handle);
|
||||||
static void cliIdleCb(uv_idle_t* handle);
|
static void cliIdleCb(uv_idle_t* handle);
|
||||||
static void cliPrepareCb(uv_prepare_t* handle);
|
static void cliPrepareCb(uv_prepare_t* handle);
|
||||||
|
|
||||||
|
static int32_t allocConnRef(SCliConn* conn, bool update);
|
||||||
|
|
||||||
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
|
static int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg);
|
||||||
|
|
||||||
static SCliConn* cliCreateConn(SCliThrd* thrd);
|
static SCliConn* cliCreateConn(SCliThrd* thrd);
|
||||||
|
@ -500,9 +504,8 @@ void* destroyConnPool(void* pool) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
|
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
|
||||||
char key[128] = {0};
|
char key[32] = {0};
|
||||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
||||||
|
|
||||||
SHashObj* pPool = pool;
|
SHashObj* pPool = pool;
|
||||||
SConnList* plist = taosHashGet(pPool, key, strlen(key));
|
SConnList* plist = taosHashGet(pPool, key, strlen(key));
|
||||||
if (plist == NULL) {
|
if (plist == NULL) {
|
||||||
|
@ -520,13 +523,44 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
|
||||||
conn->status = ConnNormal;
|
conn->status = ConnNormal;
|
||||||
QUEUE_REMOVE(&conn->q);
|
QUEUE_REMOVE(&conn->q);
|
||||||
QUEUE_INIT(&conn->q);
|
QUEUE_INIT(&conn->q);
|
||||||
assert(h == &conn->q);
|
|
||||||
|
|
||||||
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
|
transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task);
|
||||||
conn->task = NULL;
|
conn->task = NULL;
|
||||||
|
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
|
static void addConnToPool(void* pool, SCliConn* conn) {
|
||||||
|
if (conn->status == ConnInPool) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
SCliThrd* thrd = conn->hostThrd;
|
||||||
|
CONN_HANDLE_THREAD_QUIT(thrd);
|
||||||
|
|
||||||
|
allocConnRef(conn, true);
|
||||||
|
|
||||||
|
STrans* pTransInst = thrd->pTransInst;
|
||||||
|
cliReleaseUnfinishedMsg(conn);
|
||||||
|
transQueueClear(&conn->cliMsgs);
|
||||||
|
transCtxCleanup(&conn->ctx);
|
||||||
|
conn->status = ConnInPool;
|
||||||
|
|
||||||
|
if (conn->list == NULL) {
|
||||||
|
char key[32] = {0};
|
||||||
|
CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port);
|
||||||
|
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
|
||||||
|
conn->list = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||||
|
}
|
||||||
|
assert(conn->list != NULL);
|
||||||
|
QUEUE_INIT(&conn->q);
|
||||||
|
QUEUE_PUSH(&conn->list->conn, &conn->q);
|
||||||
|
|
||||||
|
assert(!QUEUE_IS_EMPTY(&conn->list->conn));
|
||||||
|
|
||||||
|
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
|
||||||
|
arg->param1 = conn;
|
||||||
|
arg->param2 = thrd;
|
||||||
|
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
|
||||||
|
}
|
||||||
static int32_t allocConnRef(SCliConn* conn, bool update) {
|
static int32_t allocConnRef(SCliConn* conn, bool update) {
|
||||||
if (update) {
|
if (update) {
|
||||||
transRemoveExHandle(transGetRefMgt(), conn->refId);
|
transRemoveExHandle(transGetRefMgt(), conn->refId);
|
||||||
|
@ -557,38 +591,6 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void addConnToPool(void* pool, SCliConn* conn) {
|
|
||||||
if (conn->status == ConnInPool) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
SCliThrd* thrd = conn->hostThrd;
|
|
||||||
CONN_HANDLE_THREAD_QUIT(thrd);
|
|
||||||
|
|
||||||
allocConnRef(conn, true);
|
|
||||||
|
|
||||||
STrans* pTransInst = thrd->pTransInst;
|
|
||||||
cliReleaseUnfinishedMsg(conn);
|
|
||||||
transQueueClear(&conn->cliMsgs);
|
|
||||||
transCtxCleanup(&conn->ctx);
|
|
||||||
conn->status = ConnInPool;
|
|
||||||
|
|
||||||
char key[128] = {0};
|
|
||||||
CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port);
|
|
||||||
tTrace("%s conn %p added to conn pool, read buf cap:%d", CONN_GET_INST_LABEL(conn), conn, conn->readBuf.cap);
|
|
||||||
|
|
||||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
|
||||||
// list already create before
|
|
||||||
assert(plist != NULL);
|
|
||||||
QUEUE_INIT(&conn->q);
|
|
||||||
QUEUE_PUSH(&plist->conn, &conn->q);
|
|
||||||
|
|
||||||
assert(!QUEUE_IS_EMPTY(&plist->conn));
|
|
||||||
|
|
||||||
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
|
|
||||||
arg->param1 = conn;
|
|
||||||
arg->param2 = thrd;
|
|
||||||
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
|
|
||||||
}
|
|
||||||
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
|
||||||
SCliConn* conn = handle->data;
|
SCliConn* conn = handle->data;
|
||||||
SConnBuffer* pBuf = &conn->readBuf;
|
SConnBuffer* pBuf = &conn->readBuf;
|
||||||
|
|
Loading…
Reference in New Issue