Merge pull request #27177 from taosdata/fix/3.0/TD-31337
fix exec task memory leaks
This commit is contained in:
commit
a3ec3d167b
|
@ -332,17 +332,12 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == dispatcher->pDataBlocks) {
|
|
||||||
taosMemoryFree(dispatcher);
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _return;
|
|
||||||
}
|
|
||||||
|
|
||||||
*pHandle = dispatcher;
|
*pHandle = dispatcher;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
if (dispatcher) {
|
||||||
taosMemoryFree(pManager);
|
dsDestroyDataSinker(dispatcher);
|
||||||
|
}
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
|
@ -316,7 +316,6 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
|
||||||
qTaskInfo_t pTaskInfo = NULL;
|
qTaskInfo_t pTaskInfo = NULL;
|
||||||
code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_QUEUE);
|
code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_QUEUE);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
nodesDestroyNode((SNode*)pPlan);
|
|
||||||
qDestroyTask(pTaskInfo);
|
qDestroyTask(pTaskInfo);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -352,7 +351,6 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
|
||||||
qTaskInfo_t pTaskInfo = NULL;
|
qTaskInfo_t pTaskInfo = NULL;
|
||||||
code = qCreateExecTask(readers, vgId, taskId, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM);
|
code = qCreateExecTask(readers, vgId, taskId, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
nodesDestroyNode((SNode*)pPlan);
|
|
||||||
qDestroyTask(pTaskInfo);
|
qDestroyTask(pTaskInfo);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -360,7 +358,6 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
|
||||||
|
|
||||||
code = qStreamInfoResetTimewindowFilter(pTaskInfo);
|
code = qStreamInfoResetTimewindowFilter(pTaskInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
nodesDestroyNode((SNode*)pPlan);
|
|
||||||
qDestroyTask(pTaskInfo);
|
qDestroyTask(pTaskInfo);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -65,6 +65,7 @@ int32_t doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC
|
||||||
p->id.taskId = taskId;
|
p->id.taskId = taskId;
|
||||||
p->id.str = taosMemoryMalloc(64);
|
p->id.str = taosMemoryMalloc(64);
|
||||||
if (p->id.str == NULL) {
|
if (p->id.str == NULL) {
|
||||||
|
doDestroyTask(p);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,6 +101,7 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand
|
||||||
int32_t vgId, char* sql, EOPTR_EXEC_MODEL model) {
|
int32_t vgId, char* sql, EOPTR_EXEC_MODEL model) {
|
||||||
int32_t code = doCreateTask(pPlan->id.queryId, taskId, vgId, model, &pHandle->api, pTaskInfo);
|
int32_t code = doCreateTask(pPlan->id.queryId, taskId, vgId, model, &pHandle->api, pTaskInfo);
|
||||||
if (*pTaskInfo == NULL || code != 0) {
|
if (*pTaskInfo == NULL || code != 0) {
|
||||||
|
nodesDestroyNode((SNode*)pPlan);
|
||||||
taosMemoryFree(sql);
|
taosMemoryFree(sql);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2081,6 +2081,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
|
||||||
int32_t tempRes = tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);
|
int32_t tempRes = tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req);
|
||||||
if (tempRes < 0) {
|
if (tempRes < 0) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
|
taosMemoryFree(buf1);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2089,6 +2090,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca
|
||||||
if (NULL == pMsgSendInfo) {
|
if (NULL == pMsgSendInfo) {
|
||||||
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
|
qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
|
||||||
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
taosMemoryFree(buf1);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -778,11 +778,13 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
|
||||||
sql = NULL;
|
sql = NULL;
|
||||||
if (code) {
|
if (code) {
|
||||||
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
|
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
|
||||||
|
qDestroyTask(pTaskInfo);
|
||||||
QW_ERR_JRET(code);
|
QW_ERR_JRET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == sinkHandle || NULL == pTaskInfo) {
|
if (NULL == sinkHandle || NULL == pTaskInfo) {
|
||||||
QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
|
QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
|
||||||
|
qDestroyTask(pTaskInfo);
|
||||||
QW_ERR_JRET(TSDB_CODE_APP_ERROR);
|
QW_ERR_JRET(TSDB_CODE_APP_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1277,11 +1279,13 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
|
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
|
||||||
|
qDestroyTask(pTaskInfo);
|
||||||
QW_ERR_JRET(code);
|
QW_ERR_JRET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (NULL == sinkHandle || NULL == pTaskInfo) {
|
if (NULL == sinkHandle || NULL == pTaskInfo) {
|
||||||
QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
|
QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
|
||||||
|
qDestroyTask(pTaskInfo);
|
||||||
QW_ERR_JRET(TSDB_CODE_APP_ERROR);
|
QW_ERR_JRET(TSDB_CODE_APP_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue