diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 9316ba960e..3964422411 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -332,17 +332,12 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD goto _return; } - if (NULL == dispatcher->pDataBlocks) { - taosMemoryFree(dispatcher); - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _return; - } - *pHandle = dispatcher; return TSDB_CODE_SUCCESS; _return: - - taosMemoryFree(pManager); + if (dispatcher) { + dsDestroyDataSinker(dispatcher); + } return terrno; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index dc0baebd66..b1c9207ab7 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -316,7 +316,6 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 qTaskInfo_t pTaskInfo = NULL; code = qCreateExecTask(pReaderHandle, vgId, 0, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_QUEUE); if (code != TSDB_CODE_SUCCESS) { - nodesDestroyNode((SNode*)pPlan); qDestroyTask(pTaskInfo); terrno = code; return NULL; @@ -352,7 +351,6 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v qTaskInfo_t pTaskInfo = NULL; code = qCreateExecTask(readers, vgId, taskId, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM); if (code != TSDB_CODE_SUCCESS) { - nodesDestroyNode((SNode*)pPlan); qDestroyTask(pTaskInfo); terrno = code; return NULL; @@ -360,7 +358,6 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v code = qStreamInfoResetTimewindowFilter(pTaskInfo); if (code != TSDB_CODE_SUCCESS) { - nodesDestroyNode((SNode*)pPlan); qDestroyTask(pTaskInfo); terrno = code; return NULL; diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index 7100e10276..881b4f9316 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -65,6 +65,7 @@ int32_t doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC p->id.taskId = taskId; p->id.str = taosMemoryMalloc(64); if (p->id.str == NULL) { + doDestroyTask(p); return terrno; } @@ -100,6 +101,7 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand int32_t vgId, char* sql, EOPTR_EXEC_MODEL model) { int32_t code = doCreateTask(pPlan->id.queryId, taskId, vgId, model, &pHandle->api, pTaskInfo); if (*pTaskInfo == NULL || code != 0) { + nodesDestroyNode((SNode*)pPlan); taosMemoryFree(sql); return code; } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 45ff4e5f88..d652af2768 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -2081,6 +2081,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca int32_t tempRes = tSerializeSRetrieveTableReq(buf1, contLen, &pInfo->req); if (tempRes < 0) { code = terrno; + taosMemoryFree(buf1); return NULL; } @@ -2089,6 +2090,7 @@ static SSDataBlock* sysTableScanFromMNode(SOperatorInfo* pOperator, SSysTableSca if (NULL == pMsgSendInfo) { qError("%s prepare message %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo)); pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(buf1); return NULL; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 1311179e92..2526ac73bc 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -778,11 +778,13 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { sql = NULL; if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); + qDestroyTask(pTaskInfo); QW_ERR_JRET(code); } if (NULL == sinkHandle || NULL == pTaskInfo) { QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle); + qDestroyTask(pTaskInfo); QW_ERR_JRET(TSDB_CODE_APP_ERROR); } @@ -1277,11 +1279,13 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); + qDestroyTask(pTaskInfo); QW_ERR_JRET(code); } if (NULL == sinkHandle || NULL == pTaskInfo) { QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle); + qDestroyTask(pTaskInfo); QW_ERR_JRET(TSDB_CODE_APP_ERROR); }