fix ahandle mem leak

This commit is contained in:
yihaoDeng 2022-11-11 19:16:51 +08:00
parent 8fa39accdf
commit 52b28fbceb
4 changed files with 31 additions and 6 deletions

View File

@ -61,7 +61,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray); int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray);
for (int32_t i = 0; i < numOfBatchs; ++i) { for (int32_t i = 0; i < numOfBatchs; ++i) {
SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i); SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i);
tscDebug("hb db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->stateTs, rsp->uid); tscDebug("hb db rsp, db:%s, vgVersion:%d, stateTs:%" PRId64 ", uid:%" PRIx64, rsp->db, rsp->vgVersion, rsp->stateTs,
rsp->uid);
if (rsp->vgVersion < 0) { if (rsp->vgVersion < 0) {
code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid); code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
@ -293,6 +294,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
taosThreadMutexUnlock(&appInfo.mutex); taosThreadMutexUnlock(&appInfo.mutex);
tscError("cluster not exist, key:%s", key); tscError("cluster not exist, key:%s", key);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tFreeClientHbBatchRsp(&pRsp); tFreeClientHbBatchRsp(&pRsp);
return -1; return -1;
} }
@ -322,6 +324,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
tFreeClientHbBatchRsp(&pRsp); tFreeClientHbBatchRsp(&pRsp);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
return code; return code;
} }

View File

@ -96,7 +96,7 @@ typedef void* queue[2];
//#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit //#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
//#define TRANS_RETRY_INTERVAL 15 // retry interval (ms) //#define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
#define TRANS_CONN_TIMEOUT 3000 // connect timeout (s) #define TRANS_CONN_TIMEOUT 3000 // connect timeout (ms)
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 #define TRANS_PACKET_LIMIT 1024 * 1024 * 512

View File

@ -80,6 +80,7 @@ typedef struct SCliThrd {
uint64_t nextTimeout; // next timeout uint64_t nextTimeout; // next timeout
void* pTransInst; // void* pTransInst; //
void (*destroyAhandleFp)(void* ahandle);
SHashObj* fqdn2ipCache; SHashObj* fqdn2ipCache;
SCvtAddr cvtAddr; SCvtAddr cvtAddr;
@ -158,6 +159,7 @@ static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq,
static FORCE_INLINE void destroyUserdata(STransMsg* userdata); static FORCE_INLINE void destroyUserdata(STransMsg* userdata);
static FORCE_INLINE void destroyCmsg(void* cmsg); static FORCE_INLINE void destroyCmsg(void* cmsg);
static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg);
static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst); static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst);
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx); static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx);
@ -569,7 +571,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
if (conn->list->size >= 50) { if (conn->list->size >= 50) {
STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg));
arg->param1 = conn; arg->param1 = conn;
arg->param2 = NULL; arg->param2 = thrd;
STrans* pTransInst = thrd->pTransInst; STrans* pTransInst = thrd->pTransInst;
conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime));
@ -693,8 +695,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
} }
if (conn->timer != NULL) { if (conn->timer != NULL) {
uv_timer_stop(conn->timer); uv_timer_stop(conn->timer);
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer->data = NULL; conn->timer->data = NULL;
taosArrayPush(pThrd->timerList, &conn->timer);
conn->timer = NULL; conn->timer = NULL;
} }
@ -1213,6 +1215,25 @@ static FORCE_INLINE void destroyCmsg(void* arg) {
taosMemoryFree(pMsg); taosMemoryFree(pMsg);
} }
static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
if (param == NULL) return;
STaskArg* arg = param;
SCliMsg* pMsg = arg->param1;
SCliThrd* pThrd = arg->param2;
tDebug("destroy Ahandle A");
if (pThrd != NULL && pThrd->destroyAhandleFp != NULL) {
tDebug("destroy Ahandle B");
pThrd->destroyAhandleFp(pMsg->ctx->ahandle);
}
tDebug("destroy Ahandle C");
transDestroyConnCtx(pMsg->ctx);
destroyUserdata(&pMsg->msg);
taosMemoryFree(pMsg);
}
static SCliThrd* createThrdObj(void* trans) { static SCliThrd* createThrdObj(void* trans) {
STrans* pTransInst = trans; STrans* pTransInst = trans;
@ -1247,6 +1268,7 @@ static SCliThrd* createThrdObj(void* trans) {
pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
pThrd->pTransInst = trans; pThrd->pTransInst = trans;
pThrd->destroyAhandleFp = pTransInst->destroyFp;
pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pThrd->fqdn2ipCache = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
pThrd->quit = false; pThrd->quit = false;
return pThrd; return pThrd;
@ -1262,7 +1284,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg);
transAsyncPoolDestroy(pThrd->asyncPool); transAsyncPoolDestroy(pThrd->asyncPool);
transDQDestroy(pThrd->delayQueue, destroyCmsg); transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);
transDQDestroy(pThrd->timeoutQueue, NULL); transDQDestroy(pThrd->timeoutQueue, NULL);
tDebug("thread destroy %" PRId64, pThrd->pid); tDebug("thread destroy %" PRId64, pThrd->pid);

View File

@ -497,7 +497,7 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
SDelayTask* task = container_of(minNode, SDelayTask, node); SDelayTask* task = container_of(minNode, SDelayTask, node);
STaskArg* arg = task->arg; STaskArg* arg = task->arg;
if (freeFunc) freeFunc(arg->param1); if (freeFunc) freeFunc(arg);
taosMemoryFree(arg); taosMemoryFree(arg);
taosMemoryFree(task); taosMemoryFree(task);