From 7f7b1a423a2b42b57d47a1aa1643ed0637c97e37 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 12 May 2022 12:10:22 +0800 Subject: [PATCH 1/6] fix(rpc): avoid fd leak --- ...utorTests.cpp => index_executor_tests.cpp} | 0 source/libs/transport/src/transCli.c | 23 ++++++------ source/libs/transport/src/transSrv.c | 36 +++++++++++++------ tests/pytest/util/dnodes.py | 4 +-- 4 files changed, 39 insertions(+), 24 deletions(-) rename source/libs/executor/test/{indexexcutorTests.cpp => index_executor_tests.cpp} (100%) diff --git a/source/libs/executor/test/indexexcutorTests.cpp b/source/libs/executor/test/index_executor_tests.cpp similarity index 100% rename from source/libs/executor/test/indexexcutorTests.cpp rename to source/libs/executor/test/index_executor_tests.cpp diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 718be6aa64..5570bdcd3e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -21,15 +21,16 @@ typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; uv_write_t writeReq; - void* hostThrd; - SConnBuffer readBuf; - void* data; - STransQueue cliMsgs; - queue conn; - uint64_t expireTime; - int hThrdIdx; - STransCtx ctx; + void* hostThrd; + int hThrdIdx; + + SConnBuffer readBuf; + STransQueue cliMsgs; + queue conn; + uint64_t expireTime; + + STransCtx ctx; bool broken; // link broken or not ConnStatus status; // @@ -157,13 +158,11 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); transClearBuffer(&conn->readBuf); \ transFreeMsg(transContFromHead((char*)head)); \ tDebug("cli conn %p receive release request, ref: %d", conn, T_REF_VAL_GET(conn)); \ - while (T_REF_VAL_GET(conn) > 1) { \ - transUnrefCliHandle(conn); \ - } \ - if (T_REF_VAL_GET(conn) == 1) { \ + if (T_REF_VAL_GET(conn) > 1) { \ transUnrefCliHandle(conn); \ } \ destroyCmsg(pMsg); \ + addConnToPool(((SCliThrdObj*)conn->hostThrd)->pool, conn); \ return; \ } \ } while (0) diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index af66d39904..d0d45f11e3 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -35,7 +35,6 @@ typedef struct SSrvConn { uv_timer_t pTimer; queue queue; - int persist; // persist connection or not SConnBuffer readBuf; // read buf, int inType; void* pTransInst; // rpc init @@ -138,6 +137,7 @@ static void destroySmsg(SSrvMsg* smsg); // check whether already read complete packet static SSrvConn* createConn(void* hThrd); static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/); +static int reallocConnRefHandle(SSrvConn* conn); static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); @@ -164,7 +164,7 @@ static void* transWorkerThread(void* arg); static void* transAcceptThread(void* arg); // add handle loop -static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName); +static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName); static bool addHandleToAcceptloop(void* arg); #define CONN_SHOULD_RELEASE(conn, head) \ @@ -517,7 +517,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) { int64_t refId = transMsg.refId; SExHandle* exh2 = uvAcquireExHandle(refId); if (exh2 == NULL || exh1 != exh2) { - tTrace("server handle %p except msg, ignore it", exh1); + tTrace("server handle except msg %p, ignore it", exh1); uvReleaseExHandle(refId); destroySmsg(msg); continue; @@ -581,11 +581,12 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { if (uv_accept(stream, (uv_stream_t*)cli) == 0) { if (pObj->numOfWorkerReady < pObj->numOfThreads) { - tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, pObj->numOfWorkerReady); + tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, + pObj->numOfWorkerReady); uv_close((uv_handle_t*)cli, NULL); return; } - + uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t)); wr->data = cli; uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); @@ -681,14 +682,14 @@ void* transAcceptThread(void* arg) { return NULL; } -void uvOnPipeConnectionCb(uv_connect_t *connect, int status) { +void uvOnPipeConnectionCb(uv_connect_t* connect, int status) { if (status != 0) { return; } SWorkThrdObj* pThrd = container_of(connect, SWorkThrdObj, connect_req); uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); } -static bool addHandleToWorkloop(SWorkThrdObj* pThrd,char *pipeName) { +static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName) { pThrd->loop = (uv_loop_t*)taosMemoryMalloc(sizeof(uv_loop_t)); if (0 != uv_loop_init(pThrd->loop)) { return false; @@ -787,6 +788,19 @@ static void destroyConn(SSrvConn* conn, bool clear) { // uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); } } +static int reallocConnRefHandle(SSrvConn* conn) { + uvReleaseExHandle(conn->refId); + uvRemoveExHandle(conn->refId); + // avoid app continue to send msg on invalid handle + SExHandle* exh = taosMemoryMalloc(sizeof(SExHandle)); + exh->handle = conn; + exh->pThrd = conn->hostThrd; + exh->refId = uvAddExHandle(exh); + uvAcquireExHandle(exh->refId); + conn->refId = exh->refId; + + return 0; +} static void uvDestroyConn(uv_handle_t* handle) { SSrvConn* conn = handle->data; if (conn == NULL) { @@ -822,7 +836,7 @@ static void uvPipeListenCb(uv_stream_t* handle, int status) { ASSERT(status == 0); SServerObj* srv = container_of(handle, SServerObj, pipeListen); - uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]); + uv_pipe_t* pipe = &(srv->pipe[srv->numOfWorkerReady][0]); ASSERT(0 == uv_pipe_init(srv->loop, pipe, 1)); ASSERT(0 == uv_accept((uv_stream_t*)&srv->pipeListen, (uv_stream_t*)pipe)); @@ -859,7 +873,8 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%p-%lu", taosSafeRand(), GetCurrentProcessId()); #else char pipeName[PATH_MAX] = {0}; - snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(), taosGetSelfPthreadId()); + snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08X-%lu", tsTempDir, TD_DIRSEP, taosSafeRand(), + taosGetSelfPthreadId()); #endif assert(0 == uv_pipe_bind(&srv->pipeListen, pipeName)); assert(0 == uv_listen((uv_stream_t*)&srv->pipeListen, SOMAXCONN, uvPipeListenCb)); @@ -874,7 +889,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t)); thrd->pipe = &(srv->pipe[i][1]); // init read - if (false == addHandleToWorkloop(thrd,pipeName)) { + if (false == addHandleToWorkloop(thrd, pipeName)) { goto End; } int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd)); @@ -958,6 +973,7 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { } void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { SSrvConn* conn = msg->pConn; + reallocConnRefHandle(conn); if (conn->status == ConnAcquire) { if (!transQueuePush(&conn->srvMsgs, msg)) { return; diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 9dcd485194..3c23e784c5 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -35,7 +35,7 @@ class TDSimClient: "tableIncStepPerVnode": "10000", "maxVgroupsPerDb": "1000", "sdbDebugFlag": "143", - "rpcDebugFlag": "135", + "rpcDebugFlag": "143", "tmrDebugFlag": "131", "cDebugFlag": "135", "udebugFlag": "135", @@ -136,7 +136,7 @@ class TDDnode: "tsdbDebugFlag": "135", "mDebugFlag": "135", "sdbDebugFlag": "135", - "rpcDebugFlag": "135", + "rpcDebugFlag": "143", "tmrDebugFlag": "131", "cDebugFlag": "135", "httpDebugFlag": "135", From dd8e642f3771d9cad9dcbc4f56e2e78ddca896d4 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 12 May 2022 13:35:42 +0800 Subject: [PATCH 2/6] fix(rpc): avoid fd leak --- source/libs/qcom/src/queryUtil.c | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 3e3e393f5f..b7a3395206 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -153,10 +153,10 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra .handle = pInfo->msgInfo.handle, .persistHandle = persistHandle, .code = 0}; - if (pInfo->msgType == TDMT_VND_QUERY || pInfo->msgType == TDMT_VND_FETCH || - pInfo->msgType == TDMT_VND_QUERY_CONTINUE) { - rpcMsg.persistHandle = 1; - } + // if (pInfo->msgType == TDMT_VND_QUERY || pInfo->msgType == TDMT_VND_FETCH || + // pInfo->msgType == TDMT_VND_QUERY_CONTINUE) { + // rpcMsg.persistHandle = 1; + //} assert(pInfo->fp != NULL); @@ -168,7 +168,7 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp return asyncSendMsgToServerExt(pTransporter, epSet, pTransporterId, pInfo, false, NULL); } -char *jobTaskStatusStr(int32_t status) { +char* jobTaskStatusStr(int32_t status) { switch (status) { case JOB_TASK_STATUS_NULL: return "NULL"; @@ -197,13 +197,10 @@ char *jobTaskStatusStr(int32_t status) { SSchema createSchema(int8_t type, int32_t bytes, col_id_t colId, const char* name) { SSchema s = {0}; - s.type = type; + s.type = type; s.bytes = bytes; s.colId = colId; tstrncpy(s.name, name, tListLen(s.name)); return s; } - - - From 4f7886d2083c880d3ec66bca56557ccad31cccd9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 12 May 2022 15:29:19 +0800 Subject: [PATCH 3/6] fix(rpc): avoid fd leak --- source/libs/transport/src/transSrv.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index d0d45f11e3..a459e604ad 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -363,6 +363,10 @@ void uvOnSendCb(uv_write_t* req, int status) { if (msg->type == Release && conn->status != ConnNormal) { conn->status = ConnNormal; transUnrefSrvHandle(conn); + reallocConnRefHandle(conn); + destroySmsg(msg); + transQueueClear(&conn->srvMsgs); + return; } destroySmsg(msg); // send second data, just use for push @@ -973,7 +977,7 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { } void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { SSrvConn* conn = msg->pConn; - reallocConnRefHandle(conn); + // reallocConnRefHandle(conn); if (conn->status == ConnAcquire) { if (!transQueuePush(&conn->srvMsgs, msg)) { return; From ddee7344d5b1cfa03dabeb2e41eade5f9d23492b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 13 May 2022 11:25:35 +0800 Subject: [PATCH 4/6] fix(rpc): avoid fd leak --- source/libs/transport/src/transSrv.c | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index a459e604ad..7566d7147c 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -360,14 +360,14 @@ void uvOnSendCb(uv_write_t* req, int status) { tTrace("server conn %p data already was written on stream", conn); if (!transQueueEmpty(&conn->srvMsgs)) { SSrvMsg* msg = transQueuePop(&conn->srvMsgs); - if (msg->type == Release && conn->status != ConnNormal) { - conn->status = ConnNormal; - transUnrefSrvHandle(conn); - reallocConnRefHandle(conn); - destroySmsg(msg); - transQueueClear(&conn->srvMsgs); - return; - } + // if (msg->type == Release && conn->status != ConnNormal) { + // conn->status = ConnNormal; + // transUnrefSrvHandle(conn); + // reallocConnRefHandle(conn); + // destroySmsg(msg); + // transQueueClear(&conn->srvMsgs); + // return; + //} destroySmsg(msg); // send second data, just use for push if (!transQueueEmpty(&conn->srvMsgs)) { @@ -425,8 +425,15 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { if (pConn->status == ConnNormal) { pHead->msgType = pConn->inType + 1; } else { - pHead->msgType = smsg->type == Release ? 0 : pMsg->msgType; + if (smsg->type == Release) { + pHead->msgType = 0; + pConn->status = ConnNormal; + transUnrefSrvHandle(pConn); + } else { + pHead->msgType = pMsg->msgType; + } } + pHead->release = smsg->type == Release ? 1 : 0; pHead->code = htonl(pMsg->code); @@ -977,8 +984,8 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { } void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { SSrvConn* conn = msg->pConn; - // reallocConnRefHandle(conn); if (conn->status == ConnAcquire) { + reallocConnRefHandle(conn); if (!transQueuePush(&conn->srvMsgs, msg)) { return; } From c56b0e0b15736127fbe9088efe0ed23361aee724 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 13 May 2022 12:24:57 +0800 Subject: [PATCH 5/6] fix(rpc): avoid fd leak --- source/libs/transport/src/transSrv.c | 38 ++++++++++++++-------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 7566d7147c..f8e4a8bd09 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -167,25 +167,25 @@ static void* transAcceptThread(void* arg); static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName); static bool addHandleToAcceptloop(void* arg); -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - conn->status = ConnRelease; \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - tTrace("server conn %p received release request", conn); \ - \ - STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \ - SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \ - srvMsg->msg = tmsg; \ - srvMsg->type = Release; \ - srvMsg->pConn = conn; \ - if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ - return; \ - } \ - uvStartSendRespInternal(srvMsg); \ - return; \ - } \ +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head) && conn->status == ConnAcquire) { \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + tTrace("server conn %p received release request", conn); \ + \ + STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \ + SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \ + srvMsg->msg = tmsg; \ + srvMsg->type = Release; \ + srvMsg->pConn = conn; \ + reallocConnRefHandle(conn); \ + if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ + return; \ + } \ + uvStartSendRespInternal(srvMsg); \ + return; \ + } \ } while (0) #define SRV_RELEASE_UV(loop) \ From fee540c1fa38b25d2fa7df717b4acb45bec82db9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 13 May 2022 13:09:34 +0800 Subject: [PATCH 6/6] fix(rpc): avoid fd leak --- source/libs/transport/src/transSrv.c | 39 ++++++++++++++-------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index f8e4a8bd09..fc840691b6 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -167,25 +167,26 @@ static void* transAcceptThread(void* arg); static bool addHandleToWorkloop(SWorkThrdObj* pThrd, char* pipeName); static bool addHandleToAcceptloop(void* arg); -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head) && conn->status == ConnAcquire) { \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - tTrace("server conn %p received release request", conn); \ - \ - STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \ - SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \ - srvMsg->msg = tmsg; \ - srvMsg->type = Release; \ - srvMsg->pConn = conn; \ - reallocConnRefHandle(conn); \ - if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ - return; \ - } \ - uvStartSendRespInternal(srvMsg); \ - return; \ - } \ +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + conn->status = ConnRelease; \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + tTrace("server conn %p received release request", conn); \ + \ + STransMsg tmsg = {.code = 0, .handle = (void*)conn, .ahandle = NULL}; \ + SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \ + srvMsg->msg = tmsg; \ + srvMsg->type = Release; \ + srvMsg->pConn = conn; \ + reallocConnRefHandle(conn); \ + if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ + return; \ + } \ + uvStartSendRespInternal(srvMsg); \ + return; \ + } \ } while (0) #define SRV_RELEASE_UV(loop) \