fix: fix taosc memory leak
This commit is contained in:
parent
077fbd6787
commit
03e9b15237
|
@ -162,9 +162,12 @@ typedef struct SRequestConnInfo {
|
|||
SEpSet mgmtEps;
|
||||
} SRequestConnInfo;
|
||||
|
||||
typedef void (*__freeFunc)(void *param);
|
||||
|
||||
typedef struct SMsgSendInfo {
|
||||
__async_send_cb_fn_t fp; // async callback function
|
||||
STargetInfo target; // for update epset
|
||||
__freeFunc paramFreeFp;
|
||||
void* param;
|
||||
uint64_t requestId;
|
||||
uint64_t requestObjRefId;
|
||||
|
@ -188,6 +191,8 @@ int32_t cleanupTaskQueue();
|
|||
*/
|
||||
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
|
||||
|
||||
void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
|
||||
|
||||
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo,
|
||||
bool persistHandle, void* ctx);
|
||||
|
||||
|
|
|
@ -286,13 +286,10 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
|
|||
if (pInst == NULL || NULL == *pInst) {
|
||||
taosThreadMutexUnlock(&appInfo.mutex);
|
||||
tscError("cluster not exist, key:%s", key);
|
||||
taosMemoryFreeClear(param);
|
||||
tFreeClientHbBatchRsp(&pRsp);
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(param);
|
||||
|
||||
if (code != 0) {
|
||||
(*pInst)->onlineDnodes = ((*pInst)->totalDnodes ? 0 : -1);
|
||||
tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), (*pInst)->onlineDnodes, (*pInst)->totalDnodes);
|
||||
|
@ -716,6 +713,7 @@ static void *hbThreadFunc(void *param) {
|
|||
pInfo->msgInfo.len = tlen;
|
||||
pInfo->msgType = TDMT_MND_HEARTBEAT;
|
||||
pInfo->param = strdup(pAppHbMgr->key);
|
||||
pInfo->paramFreeFp = taosMemoryFree;
|
||||
pInfo->requestId = generateRequestId();
|
||||
pInfo->requestObjRefId = 0;
|
||||
|
||||
|
|
|
@ -29,7 +29,6 @@
|
|||
|
||||
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
|
||||
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
|
||||
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
|
||||
|
||||
static bool stringLengthCheck(const char* str, size_t maxsize) {
|
||||
if (str == NULL) {
|
||||
|
@ -1215,13 +1214,6 @@ static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
|
|||
return pMsgSendInfo;
|
||||
}
|
||||
|
||||
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
|
||||
assert(pMsgBody != NULL);
|
||||
taosMemoryFreeClear(pMsgBody->target.dbFName);
|
||||
taosMemoryFreeClear(pMsgBody->msgInfo.pData);
|
||||
taosMemoryFreeClear(pMsgBody);
|
||||
}
|
||||
|
||||
void updateTargetEpSet(SMsgSendInfo* pSendInfo, STscObj* pTscObj, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||
if (NULL == pEpSet) {
|
||||
return;
|
||||
|
|
|
@ -255,6 +255,8 @@ int32_t processDropDbRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid);
|
||||
}
|
||||
|
||||
taosMemoryFree(pMsg->pData);
|
||||
|
||||
if (pRequest->body.queryFp != NULL) {
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
||||
} else {
|
||||
|
@ -278,6 +280,8 @@ int32_t processAlterStbRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
pRequest->body.resInfo.execRes.res = alterRsp.pMeta;
|
||||
}
|
||||
|
||||
taosMemoryFree(pMsg->pData);
|
||||
|
||||
if (pRequest->body.queryFp != NULL) {
|
||||
SExecResult* pRes = &pRequest->body.resInfo.execRes;
|
||||
|
||||
|
@ -387,6 +391,8 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
tFreeSShowVariablesRsp(&rsp);
|
||||
}
|
||||
|
||||
taosMemoryFree(pMsg->pData);
|
||||
|
||||
if (pRequest->body.queryFp != NULL) {
|
||||
pRequest->body.queryFp(pRequest->body.param, pRequest, code);
|
||||
} else {
|
||||
|
|
|
@ -506,6 +506,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
|
|||
pMsgSendInfo->requestId = generateRequestId();
|
||||
pMsgSendInfo->requestObjRefId = 0;
|
||||
pMsgSendInfo->param = pParam;
|
||||
pMsgSendInfo->paramFreeFp = taosMemoryFree;
|
||||
pMsgSendInfo->fp = tmqCommitCb2;
|
||||
pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET;
|
||||
// send msg
|
||||
|
@ -1516,6 +1517,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
|
|||
sendInfo->requestId = generateRequestId();
|
||||
sendInfo->requestObjRefId = 0;
|
||||
sendInfo->param = pParam;
|
||||
sendInfo->paramFreeFp = taosMemoryFree;
|
||||
sendInfo->fp = tmqAskEpCb;
|
||||
sendInfo->msgType = TDMT_MND_MQ_ASK_EP;
|
||||
|
||||
|
|
|
@ -532,6 +532,14 @@ typedef struct SCtgOperation {
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
#define CTG_API_JENTER() do { \
|
||||
CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); \
|
||||
CTG_LOCK(CTG_READ, &gCtgMgmt.lock); \
|
||||
if (atomic_load_8((int8_t*)&gCtgMgmt.exit)) { \
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_OUT_OF_SERVICE); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
|
||||
#define CTG_API_LEAVE_NOLOCK(c) do { \
|
||||
int32_t __code = c; \
|
||||
|
|
|
@ -244,10 +244,11 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
|
|||
int32_t ctgHandleMsgCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
|
||||
SCtgTaskCallbackParam* cbParam = (SCtgTaskCallbackParam*)param;
|
||||
int32_t code = 0;
|
||||
SCtgJob* pJob = NULL;
|
||||
|
||||
CTG_API_ENTER();
|
||||
CTG_API_JENTER();
|
||||
|
||||
SCtgJob* pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId);
|
||||
pJob = taosAcquireRef(gCtgMgmt.jobPool, cbParam->refId);
|
||||
if (NULL == pJob) {
|
||||
qDebug("ctg job refId 0x%" PRIx64 " already dropped", cbParam->refId);
|
||||
goto _return;
|
||||
|
@ -266,8 +267,6 @@ _return:
|
|||
if (pJob) {
|
||||
taosReleaseRef(gCtgMgmt.jobPool, cbParam->refId);
|
||||
}
|
||||
|
||||
taosMemoryFree(param);
|
||||
|
||||
CTG_API_LEAVE(code);
|
||||
}
|
||||
|
@ -293,6 +292,7 @@ int32_t ctgMakeMsgSendInfo(SCtgTask* pTask, int32_t msgType, SMsgSendInfo **pMsg
|
|||
param->taskId = pTask->taskId;
|
||||
|
||||
msgSendInfo->param = param;
|
||||
msgSendInfo->paramFreeFp = taosMemoryFree;
|
||||
msgSendInfo->fp = ctgHandleMsgCallback;
|
||||
|
||||
*pMsgSendInfo = msgSendInfo;
|
||||
|
|
|
@ -110,6 +110,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMs
|
|||
pParam->pInserter = pInserter;
|
||||
|
||||
pMsgSendInfo->param = pParam;
|
||||
pMsgSendInfo->paramFreeFp = taosMemoryFree;
|
||||
pMsgSendInfo->msgInfo.pData = pMsg;
|
||||
pMsgSendInfo->msgInfo.len = ntohl(pMsg->length);
|
||||
pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
|
||||
|
|
|
@ -1990,12 +1990,6 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
|
||||
assert(pMsgBody != NULL);
|
||||
taosMemoryFreeClear(pMsgBody->msgInfo.pData);
|
||||
taosMemoryFreeClear(pMsgBody);
|
||||
}
|
||||
|
||||
void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||
SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle;
|
||||
assert(pMsg->info.ahandle != NULL);
|
||||
|
@ -2055,6 +2049,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
|
|||
pWrapper->sourceIndex = sourceIndex;
|
||||
|
||||
pMsgSendInfo->param = pWrapper;
|
||||
pMsgSendInfo->paramFreeFp = taosMemoryFree;
|
||||
pMsgSendInfo->msgInfo.pData = pMsg;
|
||||
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
|
||||
pMsgSendInfo->msgType = pSource->fetchMsgType;
|
||||
|
|
|
@ -1923,15 +1923,18 @@ int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode**
|
|||
return partitionLogicCond(pCondition, pPrimaryKeyCond, pTagIndexCond, pTagCond, pOtherCond);
|
||||
}
|
||||
|
||||
bool needOutput = false;
|
||||
switch (classifyCondition(*pCondition)) {
|
||||
case COND_TYPE_PRIMARY_KEY:
|
||||
if (NULL != pPrimaryKeyCond) {
|
||||
*pPrimaryKeyCond = *pCondition;
|
||||
needOutput = true;
|
||||
}
|
||||
break;
|
||||
case COND_TYPE_TAG_INDEX:
|
||||
if (NULL != pTagIndexCond) {
|
||||
*pTagIndexCond = *pCondition;
|
||||
needOutput = true;
|
||||
}
|
||||
if (NULL != pTagCond) {
|
||||
SNode* pTempCond = *pCondition;
|
||||
|
@ -1942,21 +1945,26 @@ int32_t nodesPartitionCond(SNode** pCondition, SNode** pPrimaryKeyCond, SNode**
|
|||
}
|
||||
}
|
||||
*pTagCond = pTempCond;
|
||||
needOutput = true;
|
||||
}
|
||||
break;
|
||||
case COND_TYPE_TAG:
|
||||
if (NULL != pTagCond) {
|
||||
*pTagCond = *pCondition;
|
||||
needOutput = true;
|
||||
}
|
||||
break;
|
||||
case COND_TYPE_NORMAL:
|
||||
default:
|
||||
if (NULL != pOtherCond) {
|
||||
*pOtherCond = *pCondition;
|
||||
needOutput = true;
|
||||
}
|
||||
break;
|
||||
}
|
||||
*pCondition = NULL;
|
||||
if (needOutput) {
|
||||
*pCondition = NULL;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -138,6 +138,16 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code)
|
|||
return 0;
|
||||
}
|
||||
|
||||
void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
|
||||
assert(pMsgBody != NULL);
|
||||
taosMemoryFreeClear(pMsgBody->target.dbFName);
|
||||
taosMemoryFreeClear(pMsgBody->msgInfo.pData);
|
||||
if (pMsgBody->paramFreeFp) {
|
||||
(*pMsgBody->paramFreeFp)(pMsgBody->param);
|
||||
}
|
||||
taosMemoryFreeClear(pMsgBody);
|
||||
}
|
||||
|
||||
int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo,
|
||||
bool persistHandle, void* rpcCtx) {
|
||||
char* pMsg = rpcMallocCont(pInfo->msgInfo.len);
|
||||
|
|
|
@ -386,7 +386,6 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
|
|||
schProcessOnCbEnd(pJob, pTask, code);
|
||||
|
||||
taosMemoryFreeClear(pMsg->pData);
|
||||
taosMemoryFreeClear(param);
|
||||
|
||||
qDebug("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle,
|
||||
tstrerror(rspCode));
|
||||
|
@ -398,7 +397,6 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
|||
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
|
||||
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId,
|
||||
code);
|
||||
taosMemoryFreeClear(param);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -447,8 +445,8 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
|||
SCH_ERR_JRET(schProcessOnTaskStatusRsp(&rsp.epId, rsp.taskStatus));
|
||||
|
||||
_return:
|
||||
|
||||
tFreeSSchedulerHbRsp(&rsp);
|
||||
taosMemoryFree(param);
|
||||
taosMemoryFree(pMsg->pData);
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
@ -514,7 +512,9 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3
|
|||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
msgSendInfo->paramFreeFp = taosMemoryFree;
|
||||
SCH_ERR_JRET(schMakeCallbackParam(pJob, pTask, msgType, isHb, trans, &msgSendInfo->param));
|
||||
|
||||
SCH_ERR_JRET(schGetCallbackFp(msgType, &msgSendInfo->fp));
|
||||
|
||||
if (pJob) {
|
||||
|
@ -535,7 +535,7 @@ int32_t schGenerateCallBackInfo(SSchJob *pJob, SSchTask *pTask, void *msg, uint3
|
|||
|
||||
_return:
|
||||
|
||||
schFreeSMsgSendInfo(msgSendInfo);
|
||||
destroySendMsgInfo(msgSendInfo);
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
@ -676,6 +676,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
|
|||
param->pTrans = pJob->conn.pTrans;
|
||||
|
||||
pMsgSendInfo->param = param;
|
||||
pMsgSendInfo->paramFreeFp = taosMemoryFree;
|
||||
pMsgSendInfo->fp = fp;
|
||||
|
||||
SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo};
|
||||
|
@ -795,6 +796,7 @@ int32_t schCloneSMsgSendInfo(void *src, void **dst) {
|
|||
pDst->param = NULL;
|
||||
|
||||
SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
|
||||
pDst->paramFreeFp = taosMemoryFree;
|
||||
|
||||
*dst = pDst;
|
||||
|
||||
|
|
|
@ -50,6 +50,12 @@ char* schGetOpStr(SCH_OP_TYPE type) {
|
|||
}
|
||||
}
|
||||
|
||||
void schFreeHbTrans(SSchHbTrans *pTrans) {
|
||||
rpcReleaseHandle(pTrans->trans.pHandle, TAOS_CONN_CLIENT);
|
||||
|
||||
schFreeRpcCtx(&pTrans->rpcCtx);
|
||||
}
|
||||
|
||||
void schCleanClusterHb(void* pTrans) {
|
||||
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
|
||||
|
@ -57,7 +63,7 @@ void schCleanClusterHb(void* pTrans) {
|
|||
while (hb) {
|
||||
if (hb->trans.pTrans == pTrans) {
|
||||
SQueryNodeEpId* pEpId = taosHashGetKey(hb, NULL);
|
||||
rpcReleaseHandle(hb->trans.pHandle, TAOS_CONN_CLIENT);
|
||||
schFreeHbTrans(hb);
|
||||
taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
|
||||
}
|
||||
|
||||
|
@ -68,8 +74,6 @@ void schCleanClusterHb(void* pTrans) {
|
|||
}
|
||||
|
||||
int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *epId) {
|
||||
return TSDB_CODE_SUCCESS; // TODO ENABLE IT WHEN RPC IS READY
|
||||
|
||||
int32_t code = 0;
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
|
@ -82,7 +86,7 @@ int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *ep
|
|||
|
||||
int64_t taskNum = atomic_load_64(&hb->taskNum);
|
||||
if (taskNum <= 0) {
|
||||
rpcReleaseHandle(hb->trans.pHandle, TAOS_CONN_CLIENT);
|
||||
schFreeHbTrans(hb);
|
||||
taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
|
||||
}
|
||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||
|
@ -265,9 +269,7 @@ void schFreeRpcCtxVal(const void *arg) {
|
|||
}
|
||||
|
||||
SMsgSendInfo *pMsgSendInfo = (SMsgSendInfo *)arg;
|
||||
taosMemoryFreeClear(pMsgSendInfo->param);
|
||||
taosMemoryFreeClear(pMsgSendInfo->msgInfo.pData);
|
||||
taosMemoryFreeClear(pMsgSendInfo);
|
||||
destroySendMsgInfo(pMsgSendInfo);
|
||||
}
|
||||
|
||||
void schFreeRpcCtx(SRpcCtx *pCtx) {
|
||||
|
|
Loading…
Reference in New Issue