fix(rpc): avoid partial memleak
This commit is contained in:
parent
9de53b0019
commit
c15ec36513
|
@ -212,10 +212,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define CONN_NO_PERSIST_BY_APP(conn) \
|
#define CONN_NO_PERSIST_BY_APP(conn) (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||||
(((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
#define CONN_RELEASE_BY_SERVER(conn) (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||||
#define CONN_RELEASE_BY_SERVER(conn) \
|
|
||||||
(((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
|
||||||
|
|
||||||
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
|
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
|
||||||
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
|
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
|
||||||
|
@ -290,9 +288,8 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
|
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
|
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType),
|
||||||
TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
|
taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
|
||||||
taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
|
|
||||||
|
|
||||||
conn->secured = pHead->secured;
|
conn->secured = pHead->secured;
|
||||||
|
|
||||||
|
@ -358,12 +355,10 @@ void cliHandleExcept(SCliConn* pConn) {
|
||||||
|
|
||||||
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
|
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
|
||||||
transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
|
transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
|
||||||
tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle,
|
tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, TMSG_INFO(transMsg.msgType));
|
||||||
TMSG_INFO(transMsg.msgType));
|
|
||||||
if (transMsg.ahandle == NULL) {
|
if (transMsg.ahandle == NULL) {
|
||||||
transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
|
transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
|
||||||
tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
|
tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle);
|
||||||
transMsg.ahandle);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
|
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
|
||||||
|
@ -546,6 +541,7 @@ static void cliDestroy(uv_handle_t* handle) {
|
||||||
transCtxCleanup(&conn->ctx);
|
transCtxCleanup(&conn->ctx);
|
||||||
transQueueDestroy(&conn->cliMsgs);
|
transQueueDestroy(&conn->cliMsgs);
|
||||||
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||||
|
transDestroyBuffer(&conn->readBuf);
|
||||||
taosMemoryFree(conn);
|
taosMemoryFree(conn);
|
||||||
}
|
}
|
||||||
static bool cliHandleNoResp(SCliConn* conn) {
|
static bool cliHandleNoResp(SCliConn* conn) {
|
||||||
|
@ -635,9 +631,8 @@ void cliSend(SCliConn* pConn) {
|
||||||
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
|
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
|
||||||
|
|
||||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||||
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType),
|
||||||
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||||
taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
|
||||||
|
|
||||||
if (pHead->persist == 1) {
|
if (pHead->persist == 1) {
|
||||||
CONN_SET_PERSIST_BY_APP(pConn);
|
CONN_SET_PERSIST_BY_APP(pConn);
|
||||||
|
@ -675,10 +670,9 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
tDebug("cli work thread %p start to quit", pThrd);
|
tDebug("cli work thread %p start to quit", pThrd);
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
destroyConnPool(pThrd->pool);
|
destroyConnPool(pThrd->pool);
|
||||||
|
|
||||||
uv_timer_stop(&pThrd->timer);
|
uv_timer_stop(&pThrd->timer);
|
||||||
|
|
||||||
pThrd->quit = true;
|
pThrd->quit = true;
|
||||||
|
|
||||||
uv_stop(pThrd->loop);
|
uv_stop(pThrd->loop);
|
||||||
}
|
}
|
||||||
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
|
|
|
@ -195,7 +195,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
|
||||||
void transDestroyAsyncPool(SAsyncPool* pool) {
|
void transDestroyAsyncPool(SAsyncPool* pool) {
|
||||||
for (int i = 0; i < pool->nAsync; i++) {
|
for (int i = 0; i < pool->nAsync; i++) {
|
||||||
uv_async_t* async = &(pool->asyncs[i]);
|
uv_async_t* async = &(pool->asyncs[i]);
|
||||||
|
uv_close((uv_handle_t*)async, NULL);
|
||||||
SAsyncItem* item = async->data;
|
SAsyncItem* item = async->data;
|
||||||
taosThreadMutexDestroy(&item->mtx);
|
taosThreadMutexDestroy(&item->mtx);
|
||||||
taosMemoryFree(item);
|
taosMemoryFree(item);
|
||||||
|
|
|
@ -126,6 +126,11 @@ static void uvWorkerAsyncCb(uv_async_t* handle);
|
||||||
static void uvAcceptAsyncCb(uv_async_t* handle);
|
static void uvAcceptAsyncCb(uv_async_t* handle);
|
||||||
static void uvShutDownCb(uv_shutdown_t* req, int status);
|
static void uvShutDownCb(uv_shutdown_t* req, int status);
|
||||||
|
|
||||||
|
static void uvFreeCb(uv_handle_t* handle) {
|
||||||
|
//
|
||||||
|
taosMemoryFree(handle);
|
||||||
|
}
|
||||||
|
|
||||||
static void uvStartSendRespInternal(SSrvMsg* smsg);
|
static void uvStartSendRespInternal(SSrvMsg* smsg);
|
||||||
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
|
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
|
||||||
static void uvStartSendResp(SSrvMsg* msg);
|
static void uvStartSendResp(SSrvMsg* msg);
|
||||||
|
@ -141,8 +146,7 @@ static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
|
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd);
|
static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
|
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
|
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, uvHandleRegister};
|
||||||
uvHandleRegister};
|
|
||||||
|
|
||||||
static void uvDestroyConn(uv_handle_t* handle);
|
static void uvDestroyConn(uv_handle_t* handle);
|
||||||
|
|
||||||
|
@ -205,13 +209,12 @@ static void uvHandleReq(SSrvConn* pConn) {
|
||||||
}
|
}
|
||||||
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
||||||
transRefSrvHandle(pConn);
|
transRefSrvHandle(pConn);
|
||||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
|
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr),
|
||||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
|
ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen);
|
||||||
ntohs(pConn->locaddr.sin_port), transMsg.contLen);
|
|
||||||
} else {
|
} else {
|
||||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
|
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, TMSG_INFO(transMsg.msgType),
|
||||||
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port),
|
||||||
taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp);
|
transMsg.contLen, pHead->noResp);
|
||||||
// no ref here
|
// no ref here
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -318,6 +321,8 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
|
||||||
} else {
|
} else {
|
||||||
tError("fail to dispatch conn to work thread");
|
tError("fail to dispatch conn to work thread");
|
||||||
}
|
}
|
||||||
|
uv_close((uv_handle_t*)req->data, uvFreeCb);
|
||||||
|
// taosMemoryFree(req->data);
|
||||||
taosMemoryFree(req);
|
taosMemoryFree(req);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -349,9 +354,8 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
|
||||||
|
|
||||||
char* msg = (char*)pHead;
|
char* msg = (char*)pHead;
|
||||||
int32_t len = transMsgLenFromCont(pMsg->contLen);
|
int32_t len = transMsgLenFromCont(pMsg->contLen);
|
||||||
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
|
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr),
|
||||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
|
ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||||
ntohs(pConn->locaddr.sin_port));
|
|
||||||
pHead->msgLen = htonl(len);
|
pHead->msgLen = htonl(len);
|
||||||
|
|
||||||
wb->base = msg;
|
wb->base = msg;
|
||||||
|
@ -429,11 +433,39 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
|
||||||
(*transAsyncHandle[msg->type])(msg, pThrd);
|
(*transAsyncHandle[msg->type])(msg, pThrd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
static void uvWalkCb(uv_handle_t* handle, void* arg) {
|
||||||
|
if (!uv_is_closing(handle)) {
|
||||||
|
uv_close(handle, NULL);
|
||||||
|
// uv_unref(handle);
|
||||||
|
tDebug("handle: %p -----test----", handle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#define MAKE_VALGRIND_HAPPY(loop) \
|
||||||
|
do { \
|
||||||
|
uv_walk(loop, uvWalkCb, NULL); \
|
||||||
|
uv_run(loop, UV_RUN_DEFAULT); \
|
||||||
|
uv_loop_close(loop); \
|
||||||
|
} while (0);
|
||||||
|
|
||||||
static void uvAcceptAsyncCb(uv_async_t* async) {
|
static void uvAcceptAsyncCb(uv_async_t* async) {
|
||||||
SServerObj* srv = async->data;
|
SServerObj* srv = async->data;
|
||||||
tDebug("close server port %d", srv->port);
|
tDebug("close server port %d", srv->port);
|
||||||
uv_close((uv_handle_t*)&srv->server, NULL);
|
uv_walk(srv->loop, uvWalkCb, NULL);
|
||||||
uv_stop(srv->loop);
|
// uv_close((uv_handle_t*)async, NULL);
|
||||||
|
// uv_close((uv_handle_t*)&srv->server, NULL);
|
||||||
|
// uv_stop(srv->loop);
|
||||||
|
// uv_print_all_handles(srv->loop, stderr);
|
||||||
|
// int ref = uv_loop_alive(srv->loop);
|
||||||
|
// assert(ref == 0);
|
||||||
|
// tError("active size %d", ref);
|
||||||
|
// uv_stop(srv->loop);
|
||||||
|
// uv_run(srv->loop, UV_RUN_DEFAULT);
|
||||||
|
// fprintf(stderr, "------------------------------------");
|
||||||
|
// uv_print_all_handles(srv->loop, stderr);
|
||||||
|
|
||||||
|
// int ret = uv_loop_close(srv->loop);
|
||||||
|
// tError("(loop)->active_reqs.count: %d, ret: %d", (srv->loop)->active_reqs.count, ret);
|
||||||
|
// assert(ret == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void uvShutDownCb(uv_shutdown_t* req, int status) {
|
static void uvShutDownCb(uv_shutdown_t* req, int status) {
|
||||||
|
@ -455,16 +487,16 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
|
||||||
|
|
||||||
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
|
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
|
||||||
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
|
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
|
||||||
|
wr->data = cli;
|
||||||
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
|
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
|
||||||
|
|
||||||
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
|
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
|
||||||
|
|
||||||
tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
|
tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
|
||||||
|
|
||||||
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
|
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
|
||||||
} else {
|
} else {
|
||||||
uv_close((uv_handle_t*)cli, NULL);
|
uv_close((uv_handle_t*)cli, NULL);
|
||||||
taosMemoryFree(cli);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
|
@ -474,7 +506,10 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
tError("read error %s", uv_err_name(nread));
|
tError("read error %s", uv_err_name(nread));
|
||||||
}
|
}
|
||||||
// TODO(log other failure reason)
|
// TODO(log other failure reason)
|
||||||
// uv_close((uv_handle_t*)q, NULL);
|
tError("failed to create connect: %p", q);
|
||||||
|
taosMemoryFree(buf->base);
|
||||||
|
uv_close((uv_handle_t*)q, NULL);
|
||||||
|
// taosMemoryFree(q);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// free memory allocated by
|
// free memory allocated by
|
||||||
|
@ -650,6 +685,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
|
||||||
|
|
||||||
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
|
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
|
||||||
tTrace("work thread quit");
|
tTrace("work thread quit");
|
||||||
|
// uv_walk(thrd->loop, uvWalkCb, NULL);
|
||||||
uv_loop_close(thrd->loop);
|
uv_loop_close(thrd->loop);
|
||||||
uv_stop(thrd->loop);
|
uv_stop(thrd->loop);
|
||||||
}
|
}
|
||||||
|
@ -713,6 +749,7 @@ End:
|
||||||
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
|
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
|
||||||
thrd->quit = true;
|
thrd->quit = true;
|
||||||
if (QUEUE_IS_EMPTY(&thrd->conn)) {
|
if (QUEUE_IS_EMPTY(&thrd->conn)) {
|
||||||
|
// uv_walk(thrd->loop, uvWalkCb, NULL);
|
||||||
uv_loop_close(thrd->loop);
|
uv_loop_close(thrd->loop);
|
||||||
uv_stop(thrd->loop);
|
uv_stop(thrd->loop);
|
||||||
} else {
|
} else {
|
||||||
|
@ -765,8 +802,9 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
taosThreadJoin(pThrd->thread, NULL);
|
taosThreadJoin(pThrd->thread, NULL);
|
||||||
taosMemoryFree(pThrd->loop);
|
// MAKE_VALGRIND_HAPPY(pThrd->loop);
|
||||||
transDestroyAsyncPool(pThrd->asyncPool);
|
transDestroyAsyncPool(pThrd->asyncPool);
|
||||||
|
taosMemoryFree(pThrd->loop);
|
||||||
taosMemoryFree(pThrd);
|
taosMemoryFree(pThrd);
|
||||||
}
|
}
|
||||||
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
|
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
|
||||||
|
@ -784,6 +822,8 @@ void transCloseServer(void* arg) {
|
||||||
uv_async_send(srv->pAcceptAsync);
|
uv_async_send(srv->pAcceptAsync);
|
||||||
taosThreadJoin(srv->thread, NULL);
|
taosThreadJoin(srv->thread, NULL);
|
||||||
|
|
||||||
|
MAKE_VALGRIND_HAPPY(srv->loop);
|
||||||
|
|
||||||
for (int i = 0; i < srv->numOfThreads; i++) {
|
for (int i = 0; i < srv->numOfThreads; i++) {
|
||||||
sendQuitToWorkThrd(srv->pThreadObj[i]);
|
sendQuitToWorkThrd(srv->pThreadObj[i]);
|
||||||
destroyWorkThrd(srv->pThreadObj[i]);
|
destroyWorkThrd(srv->pThreadObj[i]);
|
||||||
|
|
Loading…
Reference in New Issue