fix indirect leak

This commit is contained in:
yihaoDeng 2024-09-06 14:25:50 +08:00
parent 6f80e22d7a
commit 0b21b7dff1
3 changed files with 98 additions and 166 deletions

View File

@ -103,7 +103,7 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->timeToGetConn = 10 * 1000;
}
pRpc->notWaitAvaliableConn = pInit->notWaitAvaliableConn;
pRpc->tcphandle =
(*taosInitHandle[pRpc->connType])(ip, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
@ -124,14 +124,20 @@ _end:
}
void rpcClose(void* arg) {
tInfo("start to close rpc");
if (arg == NULL) {
return;
}
(void)transRemoveExHandle(transGetInstMgt(), (int64_t)arg);
(void)transReleaseExHandle(transGetInstMgt(), (int64_t)arg);
tInfo("end to close rpc");
return;
}
void rpcCloseImpl(void* arg) {
if (arg == NULL) return;
SRpcInfo* pRpc = (SRpcInfo*)arg;
(*taosCloseHandle[pRpc->connType])(pRpc->tcphandle);
if (pRpc->tcphandle != NULL) {
(*taosCloseHandle[pRpc->connType])(pRpc->tcphandle);
}
taosMemoryFree(pRpc);
}
@ -168,7 +174,7 @@ int32_t rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64
return transSendRequest(shandle, pEpSet, pMsg, NULL);
}
int32_t rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid, SRpcCtx* pCtx) {
if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0|| pRid == NULL) {
if (pCtx != NULL || pMsg->info.handle != 0 || pMsg->info.noResp != 0 || pRid == NULL) {
return transSendRequest(shandle, pEpSet, pMsg, pCtx);
} else {
return transSendRequestWithId(shandle, pEpSet, pMsg, pRid);

View File

@ -99,12 +99,11 @@ typedef struct SCliMsg {
} SCliMsg;
typedef struct SCliThrd {
TdThread thread; // tid
int64_t pid; // pid
uv_loop_t* loop;
SAsyncPool* asyncPool;
uv_prepare_t* prepare;
void* pool; // conn pool
TdThread thread; // tid
int64_t pid; // pid
uv_loop_t* loop;
SAsyncPool* asyncPool;
void* pool; // conn pool
// timer handles
SArray* timerList;
// msg queue
@ -167,7 +166,6 @@ static void cliSendCb(uv_write_t* req, int status);
static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle);
static void cliIdleCb(uv_idle_t* handle);
static void cliPrepareCb(uv_prepare_t* handle);
static void cliHandleBatchReq(SCliBatch* pBatch, SCliThrd* pThrd);
static void cliSendBatchCb(uv_write_t* req, int status);
@ -231,7 +229,9 @@ static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx);
// thread obj
static int32_t createThrdObj(void* trans, SCliThrd** pThrd);
static void destroyThrdObj(SCliThrd* pThrd);
static void cliWalkCb(uv_handle_t* handle, void* arg);
int32_t cliSendQuit(SCliThrd* thrd);
static void cliWalkCb(uv_handle_t* handle, void* arg);
#define CLI_RELEASE_UV(loop) \
do { \
@ -2119,33 +2119,6 @@ static void cliAsyncCb(uv_async_t* handle) {
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
}
static void cliPrepareCb(uv_prepare_t* handle) {
SCliThrd* thrd = handle->data;
tTrace("prepare work start");
SAsyncPool* pool = thrd->asyncPool;
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
queue wq;
(void)taosThreadMutexLock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
(void)taosThreadMutexUnlock(&item->mtx);
int count = 0;
while (!QUEUE_IS_EMPTY(&wq)) {
queue* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h);
SCliMsg* pMsg = QUEUE_DATA(h, SCliMsg, q);
(*cliAsyncHandle[pMsg->type])(pMsg, thrd);
count++;
}
}
tTrace("prepare work end");
if (thrd->stopMsg != NULL) cliHandleQuit(thrd->stopMsg, thrd);
}
void cliDestroyConnMsgs(SCliConn* conn, bool destroy) {
transCtxCleanup(&conn->ctx);
@ -2260,6 +2233,12 @@ void* transInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads,
_err:
if (cli) {
for (int i = 0; i < cli->numOfThreads; i++) {
if (cli->pThreadObj[i]) {
(void)cliSendQuit(cli->pThreadObj[i]);
destroyThrdObj(cli->pThreadObj[i]);
}
}
taosMemoryFree(cli->pThreadObj);
taosMemoryFree(cli);
}
@ -2339,37 +2318,6 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
TAOS_CHECK_GOTO(code, NULL, _end);
}
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
if (pThrd->prepare == NULL) {
tError("failed to create prepre since:%s", tstrerror(code));
TAOS_CHECK_GOTO(code, NULL, _end);
}
code = uv_prepare_init(pThrd->loop, pThrd->prepare);
if (code != 0) {
tError("failed to create prepre since:%s", uv_err_name(code));
TAOS_CHECK_GOTO(TSDB_CODE_THIRDPARTY_ERROR, NULL, _end);
}
pThrd->prepare->data = pThrd;
int32_t timerSize = 64;
pThrd->timerList = taosArrayInit(timerSize, sizeof(void*));
if (pThrd->timerList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
for (int i = 0; i < timerSize; i++) {
uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
if (timer == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
(void)uv_timer_init(pThrd->loop, timer);
if (taosArrayPush(pThrd->timerList, &timer) == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
}
pThrd->pool = createConnPool(4);
if (pThrd->pool == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
@ -2402,6 +2350,23 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
int32_t timerSize = 64;
pThrd->timerList = taosArrayInit(timerSize, sizeof(void*));
if (pThrd->timerList == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
for (int i = 0; i < timerSize; i++) {
uv_timer_t* timer = taosMemoryCalloc(1, sizeof(uv_timer_t));
if (timer == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
(void)uv_timer_init(pThrd->loop, timer);
if (taosArrayPush(pThrd->timerList, &timer) == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _end);
}
}
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
pThrd->pTransInst = trans;
pThrd->quit = false;
@ -2411,17 +2376,21 @@ static int32_t createThrdObj(void* trans, SCliThrd** ppThrd) {
_end:
if (pThrd) {
(void)taosThreadMutexDestroy(&pThrd->msgMtx);
(void)uv_loop_close(pThrd->loop);
taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd->prepare);
(void)taosThreadMutexDestroy(&pThrd->msgMtx);
transAsyncPoolDestroy(pThrd->asyncPool);
for (int i = 0; i < taosArrayGetSize(pThrd->timerList); i++) {
uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i);
taosMemoryFree(timer);
}
taosArrayDestroy(pThrd->timerList);
taosMemoryFree(pThrd->prepare);
destroyConnPool(pThrd);
transDQDestroy(pThrd->delayQueue, NULL);
transDQDestroy(pThrd->timeoutQueue, NULL);
transDQDestroy(pThrd->waitConnQueue, NULL);
taosHashCleanup(pThrd->fqdn2ipCache);
taosHashCleanup(pThrd->failFastCache);
taosHashCleanup(pThrd->batchCache);
@ -2450,8 +2419,8 @@ static void destroyThrdObj(SCliThrd* pThrd) {
uv_timer_t* timer = taosArrayGetP(pThrd->timerList, i);
taosMemoryFree(timer);
}
uv_loop_close(pThrd->loop);
taosArrayDestroy(pThrd->timerList);
taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop);
taosHashCleanup(pThrd->fqdn2ipCache);
taosHashCleanup(pThrd->failFastCache);

View File

@ -82,14 +82,13 @@ typedef struct {
int64_t ver;
} SIpWhiteListTab;
typedef struct SWorkThrd {
TdThread thread;
uv_connect_t connect_req;
uv_pipe_t* pipe;
uv_os_fd_t fd;
uv_loop_t* loop;
SAsyncPool* asyncPool;
uv_prepare_t* prepare;
queue msg;
TdThread thread;
uv_connect_t connect_req;
uv_pipe_t* pipe;
uv_os_fd_t fd;
uv_loop_t* loop;
SAsyncPool* asyncPool;
queue msg;
queue conn;
void* pTransInst;
@ -98,6 +97,7 @@ typedef struct SWorkThrd {
SIpWhiteListTab* pWhiteList;
int64_t whiteListVer;
int8_t enableIpWhiteList;
int8_t inited;
} SWorkThrd;
typedef struct SServerObj {
@ -139,7 +139,6 @@ static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf)
static void uvWorkerAsyncCb(uv_async_t* handle);
static void uvAcceptAsyncCb(uv_async_t* handle);
static void uvShutDownCb(uv_shutdown_t* req, int status);
static void uvPrepareCb(uv_prepare_t* handle);
static bool uvRecvReleaseReq(SSvrConn* conn, STransMsgHead* pHead);
@ -180,6 +179,11 @@ static void uvDestroyConn(uv_handle_t* handle);
static void* transWorkerThread(void* arg);
static void* transAcceptThread(void* arg);
static void destroyWorkThrd(SWorkThrd* pThrd);
static void destroyWorkThrdObj(SWorkThrd* pThrd);
static void sendQuitToWorkThrd(SWorkThrd* pThrd);
// add handle loop
static int32_t addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName);
static int32_t addHandleToAcceptloop(void* arg);
@ -849,52 +853,6 @@ static bool uvRecvReleaseReq(SSvrConn* pConn, STransMsgHead* pHead) {
}
return false;
}
static void uvPrepareCb(uv_prepare_t* handle) {
// prepare callback
SWorkThrd* pThrd = handle->data;
SAsyncPool* pool = pThrd->asyncPool;
for (int i = 0; i < pool->nAsync; i++) {
uv_async_t* async = &(pool->asyncs[i]);
SAsyncItem* item = async->data;
queue wq;
(void)taosThreadMutexLock(&item->mtx);
QUEUE_MOVE(&item->qmsg, &wq);
(void)taosThreadMutexUnlock(&item->mtx);
while (!QUEUE_IS_EMPTY(&wq)) {
queue* head = QUEUE_HEAD(&wq);
QUEUE_REMOVE(head);
SSvrMsg* msg = QUEUE_DATA(head, SSvrMsg, q);
if (msg == NULL) {
tError("unexcept occurred, continue");
continue;
}
// release handle to rpc init
if (msg->type == Quit || msg->type == Update) {
(*transAsyncHandle[msg->type])(msg, pThrd);
continue;
} else {
STransMsg transMsg = msg->msg;
SExHandle* exh1 = transMsg.info.handle;
int64_t refId = transMsg.info.refId;
SExHandle* exh2 = transAcquireExHandle(transGetSvrRefMgt(), refId);
if (exh2 == NULL || exh1 != exh2) {
tTrace("handle except msg %p, ignore it", exh1);
(void)transReleaseExHandle(transGetSvrRefMgt(), refId);
destroySmsg(msg);
continue;
}
msg->pConn = exh1->handle;
(void)transReleaseExHandle(transGetSvrRefMgt(), refId);
(*transAsyncHandle[msg->type])(msg, pThrd);
}
}
}
}
static void uvWorkDoTask(uv_work_t* req) {
// doing time-consumeing task
@ -1101,25 +1059,6 @@ static int32_t addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
QUEUE_INIT(&pThrd->msg);
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
if (pThrd->prepare == NULL) {
tError("failed to init prepare");
return TSDB_CODE_OUT_OF_MEMORY;
}
code = uv_prepare_init(pThrd->loop, pThrd->prepare);
if (code != 0) {
tError("failed to init prepare since %s", uv_err_name(code));
return TSDB_CODE_THIRDPARTY_ERROR;
}
code = uv_prepare_start(pThrd->prepare, uvPrepareCb);
if (code != 0) {
tError("failed to start prepare since %s", uv_err_name(code));
return TSDB_CODE_THIRDPARTY_ERROR;
}
pThrd->prepare->data = pThrd;
// conn set
QUEUE_INIT(&pThrd->conn);
@ -1421,11 +1360,11 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
goto End;
}
if (false == taosValidIpAndPort(srv->ip, srv->port)) {
code = TAOS_SYSTEM_ERROR(errno);
tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr());
goto End;
}
// if (false == taosValidIpAndPort(srv->ip, srv->port)) {
// code = TAOS_SYSTEM_ERROR(errno);
// tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr());
// goto End;
// }
char pipeName[PATH_MAX];
#if defined(WINDOWS) || defined(DARWIN)
@ -1485,12 +1424,14 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
code = TSDB_CODE_OUT_OF_MEMORY;
goto End;
}
srv->pThreadObj[i] = thrd;
thrd->pTransInst = shandle;
thrd->quit = false;
thrd->pTransInst = shandle;
thrd->pWhiteList = uvWhiteListCreate();
if (thrd->pWhiteList == NULL) {
destroyWorkThrdObj(thrd);
code = TSDB_CODE_OUT_OF_MEMORY;
goto End;
}
@ -1501,8 +1442,6 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
goto End;
}
srv->pThreadObj[i] = thrd;
uv_os_sock_t fds[2];
if ((code = uv_socketpair(SOCK_STREAM, 0, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE)) != 0) {
tError("failed to create pipe, errmsg: %s", uv_err_name(code));
@ -1539,6 +1478,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
tError("failed to create worker-thread:%d", i);
goto End;
}
thrd->inited = 1;
}
#endif
@ -1560,6 +1500,12 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv->inited = true;
return srv;
End:
for (int i = 0; i < srv->numOfThreads; i++) {
if (srv->pThreadObj[i] != NULL) {
SWorkThrd* thrd = srv->pThreadObj[i];
destroyWorkThrd(thrd);
}
}
transCloseServer(srv);
terrno = code;
return NULL;
@ -1663,20 +1609,27 @@ void uvHandleUpdate(SSvrMsg* msg, SWorkThrd* thrd) {
taosMemoryFree(msg);
}
void destroyWorkThrdObj(SWorkThrd* pThrd) {
if (pThrd == NULL) {
return;
}
transAsyncPoolDestroy(pThrd->asyncPool);
uvWhiteListDestroy(pThrd->pWhiteList);
uv_loop_close(pThrd->loop);
taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd);
}
void destroyWorkThrd(SWorkThrd* pThrd) {
if (pThrd == NULL) {
return;
}
(void)taosThreadJoin(pThrd->thread, NULL);
SRV_RELEASE_UV(pThrd->loop);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsgWrapper, NULL);
transAsyncPoolDestroy(pThrd->asyncPool);
uvWhiteListDestroy(pThrd->pWhiteList);
taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd);
if (pThrd->inited) {
sendQuitToWorkThrd(pThrd);
(void)taosThreadJoin(pThrd->thread, NULL);
SRV_RELEASE_UV(pThrd->loop);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsgWrapper, NULL);
}
destroyWorkThrdObj(pThrd);
}
void sendQuitToWorkThrd(SWorkThrd* pThrd) {
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
@ -1693,13 +1646,15 @@ void transCloseServer(void* arg) {
tDebug("send quit msg to accept thread");
(void)uv_async_send(srv->pAcceptAsync);
(void)taosThreadJoin(srv->thread, NULL);
SRV_RELEASE_UV(srv->loop);
(void)uv_loop_close(srv->loop);
for (int i = 0; i < srv->numOfThreads; i++) {
sendQuitToWorkThrd(srv->pThreadObj[i]);
destroyWorkThrd(srv->pThreadObj[i]);
}
} else {
SRV_RELEASE_UV(srv->loop);
(void)uv_loop_close(srv->loop);
}
@ -1708,7 +1663,9 @@ void transCloseServer(void* arg) {
taosMemoryFree(srv->loop);
for (int i = 0; i < srv->numOfThreads; i++) {
taosMemoryFree(srv->pipe[i]);
if (srv->pipe[i] != NULL) {
taosMemoryFree(srv->pipe[i]);
}
}
taosMemoryFree(srv->pipe);