fix mem leak while taosd quit
This commit is contained in:
parent
593a10d81d
commit
a0c77f5baa
|
@ -87,9 +87,9 @@ static bool dmIsForbiddenIp(int8_t forbidden, char *user, uint32_t clientIp) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
SDnodeTrans *pTrans = &pDnode->trans;
|
SDnodeTrans * pTrans = &pDnode->trans;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SRpcMsg *pMsg = NULL;
|
SRpcMsg * pMsg = NULL;
|
||||||
SMgmtWrapper *pWrapper = NULL;
|
SMgmtWrapper *pWrapper = NULL;
|
||||||
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
|
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)];
|
||||||
|
|
||||||
|
@ -256,11 +256,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) {
|
||||||
|
|
||||||
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
|
for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) {
|
||||||
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype];
|
||||||
SArray *pArray = (*pWrapper->func.getHandlesFp)();
|
SArray * pArray = (*pWrapper->func.getHandlesFp)();
|
||||||
if (pArray == NULL) return -1;
|
if (pArray == NULL) return -1;
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||||
SMgmtHandle *pMgmt = taosArrayGet(pArray, i);
|
SMgmtHandle * pMgmt = taosArrayGet(pArray, i);
|
||||||
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
|
SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)];
|
||||||
if (pMgmt->needCheckVgId) {
|
if (pMgmt->needCheckVgId) {
|
||||||
pHandle->needCheckVgId = pMgmt->needCheckVgId;
|
pHandle->needCheckVgId = pMgmt->needCheckVgId;
|
||||||
|
@ -345,6 +345,7 @@ int32_t dmInitClient(SDnode *pDnode) {
|
||||||
rpcInit.parent = pDnode;
|
rpcInit.parent = pDnode;
|
||||||
rpcInit.rfp = rpcRfp;
|
rpcInit.rfp = rpcRfp;
|
||||||
rpcInit.compressSize = tsCompressMsgSize;
|
rpcInit.compressSize = tsCompressMsgSize;
|
||||||
|
rpcInit.dfp = destroyAhandle;
|
||||||
|
|
||||||
rpcInit.retryMinInterval = tsRedirectPeriod;
|
rpcInit.retryMinInterval = tsRedirectPeriod;
|
||||||
rpcInit.retryStepFactor = tsRedirectFactor;
|
rpcInit.retryStepFactor = tsRedirectFactor;
|
||||||
|
|
|
@ -256,21 +256,21 @@ void transAsyncPoolDestroy(SAsyncPool* pool);
|
||||||
int transAsyncSend(SAsyncPool* pool, queue* mq);
|
int transAsyncSend(SAsyncPool* pool, queue* mq);
|
||||||
bool transAsyncPoolIsEmpty(SAsyncPool* pool);
|
bool transAsyncPoolIsEmpty(SAsyncPool* pool);
|
||||||
|
|
||||||
#define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc) \
|
#define TRANS_DESTROY_ASYNC_POOL_MSG(pool, msgType, freeFunc, param) \
|
||||||
do { \
|
do { \
|
||||||
for (int i = 0; i < pool->nAsync; i++) { \
|
for (int i = 0; i < pool->nAsync; i++) { \
|
||||||
uv_async_t* async = &(pool->asyncs[i]); \
|
uv_async_t* async = &(pool->asyncs[i]); \
|
||||||
SAsyncItem* item = async->data; \
|
SAsyncItem* item = async->data; \
|
||||||
while (!QUEUE_IS_EMPTY(&item->qmsg)) { \
|
while (!QUEUE_IS_EMPTY(&item->qmsg)) { \
|
||||||
tTrace("destroy msg in async pool "); \
|
tTrace("destroy msg in async pool "); \
|
||||||
queue* h = QUEUE_HEAD(&item->qmsg); \
|
queue* h = QUEUE_HEAD(&item->qmsg); \
|
||||||
QUEUE_REMOVE(h); \
|
QUEUE_REMOVE(h); \
|
||||||
msgType* msg = QUEUE_DATA(h, msgType, q); \
|
msgType* msg = QUEUE_DATA(h, msgType, q); \
|
||||||
if (msg != NULL) { \
|
if (msg != NULL) { \
|
||||||
freeFunc(msg); \
|
freeFunc(msg, param); \
|
||||||
} \
|
} \
|
||||||
} \
|
} \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define ASYNC_CHECK_HANDLE(exh1, id) \
|
#define ASYNC_CHECK_HANDLE(exh1, id) \
|
||||||
|
|
|
@ -219,6 +219,8 @@ static void (*cliAsyncHandle[])(SCliMsg* pMsg, SCliThrd* pThrd) = {cliHandleReq,
|
||||||
/// NULL,cliHandleUpdate};
|
/// NULL,cliHandleUpdate};
|
||||||
|
|
||||||
static FORCE_INLINE void destroyCmsg(void* cmsg);
|
static FORCE_INLINE void destroyCmsg(void* cmsg);
|
||||||
|
|
||||||
|
static FORCE_INLINE void destroyQueuedMsg(void* arg, void* param);
|
||||||
static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg);
|
static FORCE_INLINE void destroyCmsgAndAhandle(void* cmsg);
|
||||||
static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst);
|
static FORCE_INLINE int cliRBChoseIdx(STrans* pTransInst);
|
||||||
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx);
|
static FORCE_INLINE void transDestroyConnCtx(STransConnCtx* ctx);
|
||||||
|
@ -1963,7 +1965,19 @@ static FORCE_INLINE void destroyCmsg(void* arg) {
|
||||||
transFreeMsg(pMsg->msg.pCont);
|
transFreeMsg(pMsg->msg.pCont);
|
||||||
taosMemoryFree(pMsg);
|
taosMemoryFree(pMsg);
|
||||||
}
|
}
|
||||||
|
static FORCE_INLINE void destroyQueuedMsg(void* arg, void* param) {
|
||||||
|
SCliMsg* pMsg = arg;
|
||||||
|
if (pMsg == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (param != NULL) {
|
||||||
|
SCliThrd* pThrd = param;
|
||||||
|
if (pThrd->destroyAhandleFp) (*pThrd->destroyAhandleFp)(pMsg->msg.info.ahandle);
|
||||||
|
}
|
||||||
|
transDestroyConnCtx(pMsg->ctx);
|
||||||
|
transFreeMsg(pMsg->msg.pCont);
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
|
}
|
||||||
static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
|
static FORCE_INLINE void destroyCmsgAndAhandle(void* param) {
|
||||||
if (param == NULL) return;
|
if (param == NULL) return;
|
||||||
|
|
||||||
|
@ -2057,7 +2071,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
|
||||||
taosThreadJoin(pThrd->thread, NULL);
|
taosThreadJoin(pThrd->thread, NULL);
|
||||||
CLI_RELEASE_UV(pThrd->loop);
|
CLI_RELEASE_UV(pThrd->loop);
|
||||||
taosThreadMutexDestroy(&pThrd->msgMtx);
|
taosThreadMutexDestroy(&pThrd->msgMtx);
|
||||||
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg);
|
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyQueuedMsg, (void*)pThrd);
|
||||||
transAsyncPoolDestroy(pThrd->asyncPool);
|
transAsyncPoolDestroy(pThrd->asyncPool);
|
||||||
|
|
||||||
transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);
|
transDQDestroy(pThrd->delayQueue, destroyCmsgAndAhandle);
|
||||||
|
|
|
@ -159,7 +159,7 @@ static void uvStartSendResp(SSvrMsg* msg);
|
||||||
|
|
||||||
static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
|
static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
|
||||||
|
|
||||||
static FORCE_INLINE void destroySmsg(SSvrMsg* smsg);
|
static FORCE_INLINE void destroySmsg(SSvrMsg* smsg);
|
||||||
static FORCE_INLINE SSvrConn* createConn(void* hThrd);
|
static FORCE_INLINE SSvrConn* createConn(void* hThrd);
|
||||||
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
|
static FORCE_INLINE void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
|
||||||
static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn);
|
static FORCE_INLINE void destroyConnRegArg(SSvrConn* conn);
|
||||||
|
@ -671,7 +671,8 @@ static FORCE_INLINE void destroySmsg(SSvrMsg* smsg) {
|
||||||
transFreeMsg(smsg->msg.pCont);
|
transFreeMsg(smsg->msg.pCont);
|
||||||
taosMemoryFree(smsg);
|
taosMemoryFree(smsg);
|
||||||
}
|
}
|
||||||
static void destroyAllConn(SWorkThrd* pThrd) {
|
static FORCE_INLINE void destroySmsgWrapper(void* smsg, void* param) { destroySmsg((SSvrMsg*)smsg); }
|
||||||
|
static void destroyAllConn(SWorkThrd* pThrd) {
|
||||||
tTrace("thread %p destroy all conn ", pThrd);
|
tTrace("thread %p destroy all conn ", pThrd);
|
||||||
while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
|
while (!QUEUE_IS_EMPTY(&pThrd->conn)) {
|
||||||
queue* h = QUEUE_HEAD(&pThrd->conn);
|
queue* h = QUEUE_HEAD(&pThrd->conn);
|
||||||
|
@ -1394,7 +1395,7 @@ void destroyWorkThrd(SWorkThrd* pThrd) {
|
||||||
}
|
}
|
||||||
taosThreadJoin(pThrd->thread, NULL);
|
taosThreadJoin(pThrd->thread, NULL);
|
||||||
SRV_RELEASE_UV(pThrd->loop);
|
SRV_RELEASE_UV(pThrd->loop);
|
||||||
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg);
|
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsgWrapper, NULL);
|
||||||
transAsyncPoolDestroy(pThrd->asyncPool);
|
transAsyncPoolDestroy(pThrd->asyncPool);
|
||||||
|
|
||||||
uvWhiteListDestroy(pThrd->pWhiteList);
|
uvWhiteListDestroy(pThrd->pWhiteList);
|
||||||
|
|
Loading…
Reference in New Issue