fix: fix memory leak issue

This commit is contained in:
dapan1121 2022-08-08 20:15:21 +08:00
parent 7d0f42aa0d
commit ea4904fb40
8 changed files with 25 additions and 7 deletions

View File

@ -327,7 +327,13 @@ int32_t hbBuildQueryDesc(SQueryHbReqBasic *hbBasic, STscObj *pObj) {
while (pIter != NULL) { while (pIter != NULL) {
int64_t *rid = pIter; int64_t *rid = pIter;
SRequestObj *pRequest = acquireRequest(*rid); SRequestObj *pRequest = acquireRequest(*rid);
if (NULL == pRequest || pRequest->killed) { if (NULL == pRequest) {
pIter = taosHashIterate(pObj->pRequests, pIter);
continue;
}
if (pRequest->killed) {
releaseRequest(*rid);
pIter = taosHashIterate(pObj->pRequests, pIter); pIter = taosHashIterate(pObj->pRequests, pIter);
continue; continue;
} }

View File

@ -283,7 +283,7 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
int32_t code = qExecCommand(pQuery->pRoot, &pRsp); int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) { if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, false); code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false, true);
} }
SReqResultInfo* pResultInfo = &pRequest->body.resInfo; SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
@ -1348,6 +1348,10 @@ int32_t doProcessMsgFromServer(void* param) {
} else { } else {
memcpy(buf.pData, pMsg->pCont, pMsg->contLen); memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
} }
tscDebug("xxxxx malloc %p, message: %s, size:%d, code: %s, gtid: %s", buf.pData,
TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code), tbuf);
} }
pSendInfo->fp(pSendInfo->param, &buf, pMsg->code); pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);

View File

@ -182,6 +182,7 @@ void taos_free_result(TAOS_RES *res) {
if (TD_RES_QUERY(res)) { if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res; SRequestObj *pRequest = (SRequestObj *)res;
tscDebug("0x%" PRIx64 " taos_free_result start to free query", pRequest->requestId);
destroyRequest(pRequest); destroyRequest(pRequest);
} else if (TD_RES_TMQ(res)) { } else if (TD_RES_TMQ(res)) {
SMqRspObj *pRsp = (SMqRspObj *)res; SMqRspObj *pRsp = (SMqRspObj *)res;
@ -482,7 +483,7 @@ void taos_stop_query(TAOS_RES *res) {
int32_t numOfFields = taos_num_fields(pRequest); int32_t numOfFields = taos_num_fields(pRequest);
// It is not a query, no need to stop. // It is not a query, no need to stop.
if (numOfFields == 0) { if (numOfFields == 0) {
tscDebug("request %" PRIx64 " no need to be killed since not query", pRequest->requestId); tscDebug("request 0x%" PRIx64 " no need to be killed since not query", pRequest->requestId);
return; return;
} }
@ -847,7 +848,7 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
} }
pRequest->code = pRequest->code =
setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, false); setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp *)pResultInfo->pData, pResultInfo->convertUcs4, true);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0; pResultInfo->numOfRows = 0;
pRequest->code = code; pRequest->code = code;

View File

@ -389,7 +389,7 @@ int32_t processShowVariablesRsp(void* param, SDataBuf* pMsg, int32_t code) {
code = buildShowVariablesRsp(rsp.variables, &pRes); code = buildShowVariablesRsp(rsp.variables, &pRes);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, false); code = setQueryResultFromRsp(&pRequest->body.resInfo, pRes, false, true);
} }
tFreeSShowVariablesRsp(&rsp); tFreeSShowVariablesRsp(&rsp);

View File

@ -415,7 +415,7 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t metaSize = (pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) * sizeof(SSchema); int32_t metaSize = sizeof(STableMeta) + (pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) * sizeof(SSchema);
*pDst = taosMemoryMalloc(metaSize); *pDst = taosMemoryMalloc(metaSize);
if (NULL == *pDst) { if (NULL == *pDst) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;

View File

@ -389,6 +389,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
_return: _return:
qDebug("xxxxx free %p", pMsg->pData);
taosMemoryFreeClear(pMsg->pData); taosMemoryFreeClear(pMsg->pData);
qDebug("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, qDebug("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle,
@ -402,6 +403,7 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId, qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId,
code); code);
if (pMsg) { if (pMsg) {
qDebug("xxxxx free %p", pMsg->pData);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -414,6 +416,8 @@ int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
qDebug("handle %p is broken", pMsg->handle); qDebug("handle %p is broken", pMsg->handle);
if (head->isHbParam) { if (head->isHbParam) {
qDebug("xxxxx free %p", pMsg->pData);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param; SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
@ -456,6 +460,7 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
_return: _return:
tFreeSSchedulerHbRsp(&rsp); tFreeSSchedulerHbRsp(&rsp);
qDebug("xxxxx free %p", pMsg->pData);
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
SCH_RET(code); SCH_RET(code);
} }

View File

@ -411,7 +411,7 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
if (pJob->fetched) { if (pJob->fetched) {
SCH_UNLOCK(SCH_WRITE, &pJob->resLock); SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode)); SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode));
SCH_ERR_RET(rspCode); SCH_ERR_JRET(rspCode);
} }
SCH_UNLOCK(SCH_WRITE, &pJob->resLock); SCH_UNLOCK(SCH_WRITE, &pJob->resLock);

View File

@ -154,6 +154,8 @@ void schedulerFreeJob(int64_t* jobId, int32_t errCode) {
return; return;
} }
SCH_JOB_DLOG("start to free job 0x%" PRIx64 ", errCode:0x%x", *jobId, errCode);
schHandleJobDrop(pJob, errCode); schHandleJobDrop(pJob, errCode);
schReleaseJob(*jobId); schReleaseJob(*jobId);